diff --git a/metaflow/cli.py b/metaflow/cli.py index 270a1bd621..e87b1a602a 100644 --- a/metaflow/cli.py +++ b/metaflow/cli.py @@ -1,13 +1,18 @@ import inspect import json +import shutil import sys +import threading +import time import traceback from datetime import datetime from functools import wraps +from itertools import cycle import metaflow.tracing as tracing from metaflow._vendor import click from metaflow.client.core import get_metadata +from metaflow.system import _system_logger, _system_monitor from . import decorators, lint, metaflow_version, namespace, parameters, plugins from .cli_args import cli_args @@ -24,7 +29,6 @@ DEFAULT_PACKAGE_SUFFIXES, ) from .metaflow_current import current -from metaflow.system import _system_monitor, _system_logger from .metaflow_environment import MetaflowEnvironment from .mflog import LOG_SOURCES, mflog from .package import MetaflowPackage @@ -68,20 +72,81 @@ def echo_dev_null(*args, **kwargs): pass +_animation_thread = None +_animation_stop = threading.Event() +_default_spinner = cycle(["-", "\\", "|", "/"]) + + +def _animate(get_frame, line, err, kwargs, indent): + while not _animation_stop.is_set(): + frame = get_frame(line) + if indent: + frame = INDENT + frame + terminal_width, _ = shutil.get_terminal_size() + frame = frame.ljust(terminal_width)[:terminal_width] + click.secho(f"\r{frame}", nl=False, err=err, **kwargs) + click.get_text_stream("stderr" if err else "stdout").flush() + time.sleep(0.1) + + def echo_always(line, **kwargs): + global _animation_thread, _animation_stop + kwargs["err"] = kwargs.get("err", True) - if kwargs.pop("indent", None): - line = "\n".join(INDENT + x for x in line.splitlines()) - if "nl" not in kwargs or kwargs["nl"]: + overwrite = kwargs.pop("overwrite", False) + animate = kwargs.pop("animate", None) + indent = kwargs.pop("indent", False) + + if _animation_thread is not None: + _animation_stop.set() + _animation_thread.join() + _animation_thread = None + _animation_stop.clear() + click.echo("\r", nl=False, err=kwargs["err"]) + + if indent: + if animate: + # For animated output, we prepend INDENT in the animation function + pass + else: + line = INDENT + line + + animation_kwargs = { + "fg": kwargs.get("fg"), + "bg": kwargs.get("bg"), + "bold": kwargs.get("bold", False), + "underline": kwargs.get("underline", False), + } + + if animate: + if animate is True: + get_frame = lambda line: f"{next(_default_spinner)} {line}" + else: + get_frame = animate + _animation_stop.clear() + _animation_thread = threading.Thread( + target=_animate, + args=(get_frame, line, kwargs["err"], animation_kwargs, indent), + ) + _animation_thread.start() + return + + if overwrite: + terminal_width, _ = shutil.get_terminal_size() + line = line.ljust(terminal_width)[:terminal_width] + click.echo("\r", nl=False, err=kwargs["err"]) + elif "nl" not in kwargs or kwargs["nl"]: line += ERASE_TO_EOL + top = kwargs.pop("padding_top", None) bottom = kwargs.pop("padding_bottom", None) highlight = kwargs.pop("highlight", HIGHLIGHT) - if top: + + if top and not overwrite: click.secho(ERASE_TO_EOL, **kwargs) hl_bold = kwargs.pop("highlight_bold", True) - nl = kwargs.pop("nl", True) + nl = kwargs.pop("nl", True) and not overwrite fg = kwargs.pop("fg", None) bold = kwargs.pop("bold", False) kwargs["nl"] = False @@ -104,8 +169,10 @@ def echo_always(line, **kwargs): if nl: kwargs["nl"] = True click.secho("", **kwargs) - if bottom: + if bottom and not overwrite: click.secho(ERASE_TO_EOL, **kwargs) + if overwrite: + click.get_text_stream("stderr" if kwargs["err"] else "stdout").flush() def logger(body="", system_msg=False, head="", bad=False, timestamp=True, nl=True): @@ -960,7 +1027,7 @@ def start( ctx.obj.environment = [ e for e in ENVIRONMENTS + [MetaflowEnvironment] if e.TYPE == environment ][0](ctx.obj.flow) - ctx.obj.environment.validate_environment(ctx.obj.logger, datastore) + ctx.obj.environment.validate_environment(echo, datastore) ctx.obj.event_logger = LOGGING_SIDECARS[event_logger]( flow=ctx.obj.flow, env=ctx.obj.environment diff --git a/metaflow/plugins/pypi/bootstrap.py b/metaflow/plugins/pypi/bootstrap.py index 828bb05087..b6736cdd79 100644 --- a/metaflow/plugins/pypi/bootstrap.py +++ b/metaflow/plugins/pypi/bootstrap.py @@ -1,4 +1,5 @@ import bz2 +import concurrent.futures import io import json import os @@ -6,6 +7,9 @@ import subprocess import sys import tarfile +import time + +import requests from metaflow.metaflow_config import DATASTORE_LOCAL_DIR from metaflow.plugins import DATASTORES @@ -15,6 +19,18 @@ # Bootstraps a valid conda virtual environment composed of conda and pypi packages + +def timer(func): + def wrapper(*args, **kwargs): + start_time = time.time() + result = func(*args, **kwargs) + duration = time.time() - start_time + # print(f"Time taken for {func.__name__}: {duration:.2f} seconds") + return result + + return wrapper + + if __name__ == "__main__": if len(sys.argv) != 5: print("Usage: bootstrap.py ") @@ -47,6 +63,8 @@ prefix = os.path.join(os.getcwd(), architecture, id_) pkgs_dir = os.path.join(os.getcwd(), ".pkgs") + conda_pkgs_dir = os.path.join(pkgs_dir, "conda") + pypi_pkgs_dir = os.path.join(pkgs_dir, "pypi") manifest_dir = os.path.join(os.getcwd(), DATASTORE_LOCAL_DIR, flow_name) datastores = [d for d in DATASTORES if d.TYPE == datastore_type] @@ -64,77 +82,145 @@ os.path.join(os.getcwd(), MAGIC_FILE), os.path.join(manifest_dir, MAGIC_FILE), ) - with open(os.path.join(manifest_dir, MAGIC_FILE)) as f: env = json.load(f)[id_][architecture] - # Download Conda packages. - conda_pkgs_dir = os.path.join(pkgs_dir, "conda") - with storage.load_bytes([package["path"] for package in env["conda"]]) as results: - for key, tmpfile, _ in results: + def run_cmd(cmd): + result = subprocess.run( + cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True + ) + if result.returncode != 0: + print(f"Bootstrap failed while executing: {cmd}") + print("Stdout:", result.stdout) + print("Stderr:", result.stderr) + sys.exit(1) + + @timer + def install_micromamba(architecture): + # TODO: check if mamba or conda are already available on the image + micromamba_dir = os.path.join(os.getcwd(), "micromamba") + micromamba_path = os.path.join(micromamba_dir, "bin", "micromamba") + + if which("micromamba") or os.path.exists(micromamba_path): + return micromamba_path + + os.makedirs(micromamba_dir, exist_ok=True) + # TODO: download micromamba from datastore + url = f"https://micro.mamba.pm/api/micromamba/{architecture}/1.5.7" + response = requests.get(url, stream=True) + if response.status_code != 200: + raise Exception( + f"Failed to download micromamba: HTTP {response.status_code}" + ) + tar_content = bz2.BZ2Decompressor().decompress(response.raw.read()) + with tarfile.open(fileobj=io.BytesIO(tar_content), mode="r:") as tar: + tar.extract("bin/micromamba", path=micromamba_dir, set_attrs=False) + + os.chmod(micromamba_path, 0o755) + if not os.path.exists(micromamba_path): + raise Exception("Failed to install Micromamba!") + + os.environ["PATH"] += os.pathsep + os.path.dirname(micromamba_path) + return micromamba_path + + @timer + def download_conda_packages(storage, packages, dest_dir): + + def process_conda_package(args): # Ensure that conda packages go into architecture specific folders. # The path looks like REPO/CHANNEL/CONDA_SUBDIR/PACKAGE. We trick # Micromamba into believing that all packages are coming from a local # channel - the only hurdle is ensuring that packages are organised # properly. - - # TODO: consider RAM disk - dest = os.path.join(conda_pkgs_dir, "/".join(key.split("/")[-2:])) + key, tmpfile, dest_dir = args + dest = os.path.join(dest_dir, "/".join(key.split("/")[-2:])) os.makedirs(os.path.dirname(dest), exist_ok=True) shutil.move(tmpfile, dest) - # Create Conda environment. - cmds = [ - # TODO: check if mamba or conda are already available on the image - # TODO: micromamba installation can be pawned off to micromamba.py - f"""set -e; - if ! command -v micromamba >/dev/null 2>&1; then - mkdir -p micromamba; - python -c "import requests, bz2, sys; data = requests.get('https://micro.mamba.pm/api/micromamba/{architecture}/1.5.7').content; sys.stdout.buffer.write(bz2.decompress(data))" | tar -xv -C $(pwd)/micromamba bin/micromamba --strip-components 1; + os.makedirs(dest_dir, exist_ok=True) + with storage.load_bytes([package["path"] for package in packages]) as results: + with concurrent.futures.ThreadPoolExecutor() as executor: + executor.map( + process_conda_package, + [(key, tmpfile, dest_dir) for key, tmpfile, _ in results], + ) + # for key, tmpfile, _ in results: + + # # TODO: consider RAM disk + # dest = os.path.join(dest_dir, "/".join(key.split("/")[-2:])) + # os.makedirs(os.path.dirname(dest), exist_ok=True) + # shutil.move(tmpfile, dest) + return dest_dir + + @timer + def download_pypi_packages(storage, packages, dest_dir): + + def process_pypi_package(args): + key, tmpfile, dest_dir = args + dest = os.path.join(dest_dir, os.path.basename(key)) + shutil.move(tmpfile, dest) + + os.makedirs(dest_dir, exist_ok=True) + with storage.load_bytes([package["path"] for package in packages]) as results: + with concurrent.futures.ThreadPoolExecutor() as executor: + executor.map( + process_pypi_package, + [(key, tmpfile, dest_dir) for key, tmpfile, _ in results], + ) + # for key, tmpfile, _ in results: + # dest = os.path.join(dest_dir, os.path.basename(key)) + # shutil.move(tmpfile, dest) + return dest_dir + + @timer + def create_conda_environment(prefix, conda_pkgs_dir): + cmd = f'''set -e; + tmpfile=$(mktemp); + echo "@EXPLICIT" > "$tmpfile"; + ls -d {conda_pkgs_dir}/*/* >> "$tmpfile"; export PATH=$PATH:$(pwd)/micromamba; - if ! command -v micromamba >/dev/null 2>&1; then - echo "Failed to install Micromamba!"; - exit 1; - fi; - fi""", - # Create a conda environment through Micromamba. - f'''set -e; - tmpfile=$(mktemp); - echo "@EXPLICIT" > "$tmpfile"; - ls -d {conda_pkgs_dir}/*/* >> "$tmpfile"; - export PATH=$PATH:$(pwd)/micromamba; - export CONDA_PKGS_DIRS=$(pwd)/micromamba/pkgs; - micromamba create --yes --offline --no-deps --safety-checks=disabled --no-extra-safety-checks --prefix {prefix} --file "$tmpfile"; - rm "$tmpfile"''', - ] - - # Download PyPI packages. - if "pypi" in env: - pypi_pkgs_dir = os.path.join(pkgs_dir, "pypi") - with storage.load_bytes( - [package["path"] for package in env["pypi"]] - ) as results: - for key, tmpfile, _ in results: - dest = os.path.join(pypi_pkgs_dir, os.path.basename(key)) - os.makedirs(os.path.dirname(dest), exist_ok=True) - shutil.move(tmpfile, dest) - - # Install PyPI packages. - cmds.extend( - [ - f"""set -e; - export PATH=$PATH:$(pwd)/micromamba; - export CONDA_PKGS_DIRS=$(pwd)/micromamba/pkgs; - micromamba run --prefix {prefix} python -m pip --disable-pip-version-check install --root-user-action=ignore --no-compile {pypi_pkgs_dir}/*.whl --no-user""" - ] - ) + export CONDA_PKGS_DIRS=$(pwd)/micromamba/pkgs; + micromamba create --yes --offline --no-deps --safety-checks=disabled --no-extra-safety-checks --prefix {prefix} --file "$tmpfile"; + rm "$tmpfile"''' + run_cmd(cmd) - for cmd in cmds: - result = subprocess.run( - cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE + @timer + def install_pypi_packages(prefix, pypi_pkgs_dir): + + cmd = f"""set -e; + export PATH=$PATH:$(pwd)/micromamba; + export CONDA_PKGS_DIRS=$(pwd)/micromamba/pkgs; + micromamba run --prefix {prefix} python -m pip --disable-pip-version-check install --root-user-action=ignore --no-compile --no-index --no-cache-dir --no-deps --prefer-binary --find-links={pypi_pkgs_dir} {pypi_pkgs_dir}/*.whl --no-user""" + run_cmd(cmd) + + with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: + # install micromamba, download conda and pypi packages in parallel + future_install_micromamba = executor.submit(install_micromamba, architecture) + future_download_conda_packages = executor.submit( + download_conda_packages, storage, env["conda"], conda_pkgs_dir ) - if result.returncode != 0: - print(f"Bootstrap failed while executing: {cmd}") - print("Stdout:", result.stdout.decode()) - print("Stderr:", result.stderr.decode()) - sys.exit(1) + future_download_pypi_packages = None + if "pypi" in env: + future_download_pypi_packages = executor.submit( + download_pypi_packages, storage, env["pypi"], pypi_pkgs_dir + ) + # create conda environment after micromamba is installed and conda packages are downloaded + concurrent.futures.wait( + [future_install_micromamba, future_download_conda_packages] + ) + future_create_conda_environment = executor.submit( + create_conda_environment, prefix, conda_pkgs_dir + ) + if "pypi" in env: + # install pypi packages after conda environment is created and pypi packages are downloaded + concurrent.futures.wait( + [future_create_conda_environment, future_download_pypi_packages] + ) + future_install_pypi_packages = executor.submit( + install_pypi_packages, prefix, pypi_pkgs_dir + ) + # wait for pypi packages to be installed + future_install_pypi_packages.result() + else: + # wait for conda environment to be created + future_create_conda_environment.result() diff --git a/metaflow/plugins/pypi/conda_environment.py b/metaflow/plugins/pypi/conda_environment.py index 1ce0ea6b11..df3f754b10 100644 --- a/metaflow/plugins/pypi/conda_environment.py +++ b/metaflow/plugins/pypi/conda_environment.py @@ -5,10 +5,11 @@ import io import json import os -import sys import tarfile +import threading import time -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import ThreadPoolExecutor, as_completed +from functools import wraps from hashlib import sha256 from io import BufferedIOBase, BytesIO from itertools import chain @@ -50,7 +51,7 @@ def decospecs(self): def validate_environment(self, logger, datastore_type): self.datastore_type = datastore_type - self.logger = logger + # self.logger = logger # Avoiding circular imports. from metaflow.plugins import DATASTORES @@ -62,8 +63,19 @@ def validate_environment(self, logger, datastore_type): from .micromamba import Micromamba from .pip import Pip - micromamba = Micromamba() - self.solvers = {"conda": micromamba, "pypi": Pip(micromamba)} + print_lock = threading.Lock() + + def make_thread_safe(func): + @wraps(func) + def wrapper(*args, **kwargs): + with print_lock: + return func(*args, **kwargs) + + return wrapper + + self.logger = make_thread_safe(logger) + micromamba = Micromamba(self.logger) + self.solvers = {"conda": micromamba, "pypi": Pip(micromamba, self.logger)} def init_environment(self, echo, only_steps=None): # The implementation optimizes for latency to ensure as many operations can @@ -109,6 +121,16 @@ def solve(id_, environment, type_): ) def cache(storage, results, type_): + self.logger( + "Caching packages ...", + fg="yellow", + bold=False, + indent=True, + overwrite=True, + animate=True, + ) + start_time = time.time() + def _path(url, local_path): # Special handling for VCS packages if url.startswith("git+"): @@ -150,6 +172,9 @@ def _path(url, local_path): ( package["path"], # Lazily fetch package from the interweb if needed. + # TODO: Depending on the len_hint, the package might be downloaded from + # the interweb prematurely. save_bytes needs to be adjusted to handle + # this scenario. LazyOpen( package["local_path"], "rb", @@ -164,25 +189,54 @@ def _path(url, local_path): ) for id_, packages, _, platform in results: if id_ in dirty: + cached = True self.write_to_environment_manifest([id_, platform, type_], packages) + if cached: + self.logger( + f"Cached packages in {time.time() - start_time:.2f}s!", + fg="yellow", + bold=False, + indent=True, + overwrite=True, + animate=True, + ) + storage = None + if self.datastore_type not in ["local"]: + # Initialize storage for caching if using a remote datastore + storage = self.datastore(_datastore_packageroot(self.datastore, echo)) + + self.logger( + "Bootstrapping virtual environment(s) ...", fg="magenta", bold=False + ) + start_time = time.time() # First resolve environments through Conda, before PyPI. - self.logger("Bootstrapping virtual environment(s) ...") for solver in ["conda", "pypi"]: with ThreadPoolExecutor() as executor: - results = list( - executor.map(lambda x: solve(*x, solver), environments(solver)) - ) - _ = list(map(lambda x: self.solvers[solver].download(*x), results)) - with ThreadPoolExecutor() as executor: - _ = list( - executor.map(lambda x: self.solvers[solver].create(*x), results) - ) - if self.datastore_type not in ["local"]: - # Cache packages only when a remote datastore is in play. - storage = self.datastore(_datastore_packageroot(self.datastore, echo)) - cache(storage, results, solver) - self.logger("Virtual environment(s) bootstrapped!") + # parallel solves + solves = [ + executor.submit(lambda x: solve(*x, solver), env) + for env in environments(solver) + ] + for future in as_completed(solves): + result = future.result() + # sequential downloads + self.solvers[solver].download(*result) + # parallel creates + executor.submit(self.solvers[solver].create, *result) + if storage: + # parallel cache + executor.submit(cache, storage, [result], solver) + executor.shutdown(wait=True) + + elapsed = time.time() - start_time + self.logger( + f"Virtual environment(s) bootstrapped{f' in {elapsed:.2f}s' if elapsed >= 1 else ''}!", + fg="green", + bold=True, + indent=True, + overwrite=True, + ) def executable(self, step_name, default=None): step = next((step for step in self.flow if step.name == step_name), None) @@ -374,7 +428,7 @@ def bootstrap_commands(self, step_name, datastore_type): 'DISABLE_TRACING=True python -m metaflow.plugins.pypi.bootstrap "%s" %s "%s" linux-64' % (self.flow.name, id_, self.datastore_type), "echo 'Environment bootstrapped.'", - "export PATH=$PATH:$(pwd)/micromamba", + "export PATH=$PATH:$(pwd)/micromamba/bin", ] else: # for @conda/@pypi(disabled=True). @@ -426,8 +480,19 @@ def write_to_environment_manifest(self, keys, value): fcntl.flock(f, fcntl.LOCK_UN) +class Processor(object): + + def __init__(self, solver, max_workers): + self.solver = solver + self.max_workers = max_workers + self.executor = ThreadPoolExecutor(max_workers=max_workers) + + def process(self, env): + self.executor.submit(self.solver.process, env) + + class LazyOpen(BufferedIOBase): - def __init__(self, filename, mode="rb", url=None): + def __init__(self, filename=None, mode="rb", url=None): super().__init__() self.filename = filename self.mode = mode diff --git a/metaflow/plugins/pypi/micromamba.py b/metaflow/plugins/pypi/micromamba.py index 378d3d5993..348c0d3039 100644 --- a/metaflow/plugins/pypi/micromamba.py +++ b/metaflow/plugins/pypi/micromamba.py @@ -1,7 +1,9 @@ +import functools import json import os import subprocess import tempfile +import time from metaflow.exception import MetaflowException from metaflow.util import which @@ -20,7 +22,7 @@ def __init__(self, error): class Micromamba(object): - def __init__(self): + def __init__(self, logger=None): # micromamba is a tiny version of the mamba package manager and comes with # metaflow specific performance enhancements. @@ -33,6 +35,19 @@ def __init__(self): os.path.expanduser(_home), "micromamba", ) + + if logger: + self.logger = functools.partial( + logger, + fg="yellow", + bold=False, + indent=True, + overwrite=True, + animate=True, + ) + else: + self.logger = lambda *args, **kwargs: None # No-op logger if not provided + self.bin = ( which(os.environ.get("METAFLOW_PATH_TO_MICROMAMBA") or "micromamba") or which("./micromamba") # to support remote execution @@ -65,6 +80,8 @@ def solve(self, id_, packages, python, platform): # environment # 4. Multiple solves can progress at the same time while relying on the same # index + self.logger(f"Solving {platform} virtual environment {id_} ...") + start_time = time.time() with tempfile.TemporaryDirectory() as tmp_dir: env = { "MAMBA_ADD_PIP_AS_PYTHON_DEPENDENCY": "true", @@ -91,21 +108,18 @@ def solve(self, id_, packages, python, platform): cmd.append("python==%s" % python) # TODO: Ensure a human readable message is returned when the environment # can't be resolved for any and all reasons. - return [ + solve = [ {k: v for k, v in item.items() if k in ["url"]} for item in self._call(cmd, env)["actions"]["LINK"] ] + self.logger( + f"Solved {platform} virtual environment {id_} in {time.time() - start_time:.2f}s!" + ) + return solve def download(self, id_, packages, python, platform): # Unfortunately all the packages need to be catalogued in package cache # because of which this function can't be parallelized - - # Micromamba is painfully slow in determining if many packages are infact - # already cached. As a perf heuristic, we check if the environment already - # exists to short circuit package downloads. - if self.path_to_environment(id_, platform): - return - prefix = "{env_dirs}/{keyword}/{platform}/{id}".format( env_dirs=self.info()["envs_dirs"][0], platform=platform, @@ -113,10 +127,21 @@ def download(self, id_, packages, python, platform): id=id_, ) - # Another forced perf heuristic to skip cross-platform downloads. + # Micromamba is painfully slow in determining if many packages are infact + # already cached. As a perf heuristic, we check if the environment already + # exists to short circuit package downloads. + + # cheap check if os.path.exists(f"{prefix}/fake.done"): return + # somewhat expensive check + # TODO: make this less expensive + if self.path_to_environment(id_, platform): + return + + self.logger(f"Downloading {platform} virtual environment {id_} ...") + start_time = time.time() with tempfile.TemporaryDirectory() as tmp_dir: env = { "CONDA_SUBDIR": platform, @@ -136,6 +161,9 @@ def download(self, id_, packages, python, platform): cmd.append("{url}".format(**package)) self._call(cmd, env) + self.logger( + f"Downloaded {platform} virtual environment {id_} in {time.time() - start_time:.2f}s!" + ) # Perf optimization to skip cross-platform downloads. if platform != self.platform(): os.makedirs(prefix, exist_ok=True) or open( @@ -148,6 +176,8 @@ def create(self, id_, packages, python, platform): if platform != self.platform() or self.path_to_environment(id_, platform): return + self.logger(f"Creating {platform} virtual environment {id_} ...") + start_time = time.time() prefix = "{env_dirs}/{keyword}/{platform}/{id}".format( env_dirs=self.info()["envs_dirs"][0], platform=platform, @@ -173,10 +203,15 @@ def create(self, id_, packages, python, platform): for package in packages: cmd.append("{url}".format(**package)) self._call(cmd, env) + self.logger( + f"Created {platform} virtual environment {id_} in {time.time() - start_time:.2f}s!" + ) + @functools.lru_cache(maxsize=None) def info(self): return self._call(["config", "list", "-a"]) + @functools.lru_cache(maxsize=None) def path_to_environment(self, id_, platform=None): if platform is None: platform = self.platform() @@ -185,6 +220,7 @@ def path_to_environment(self, id_, platform=None): keyword="metaflow", # indicates metaflow generated environment id=id_, ) + # TODO: make this call less expensive for env in self._call(["env", "list"])["envs"]: # TODO: Check bin/python is available as a heuristic for well formed env if env.endswith(suffix): @@ -198,18 +234,23 @@ def metadata(self, id_, packages, python, platform): } directories = self.info()["pkgs_dirs"] # search all package caches for packages - metadata = { - url: os.path.join(d, file) + file_to_path = {} + + for d in directories: + if os.path.isdir(d): + try: + with os.scandir(d) as entries: + for entry in entries: + if entry.is_file(): + # Prefer the first occurrence if the file exists in multiple directories + file_to_path.setdefault(entry.name, entry.path) + except OSError: + continue + return { + # set package tarball local paths to None if package tarballs are missing + url: file_to_path.get(file) for url, file in packages_to_filenames.items() - for d in directories - if os.path.isdir(d) - and file in os.listdir(d) - and os.path.isfile(os.path.join(d, file)) } - # set package tarball local paths to None if package tarballs are missing - for url in packages_to_filenames: - metadata.setdefault(url, None) - return metadata def interpreter(self, id_): return os.path.join(self.path_to_environment(id_), "bin/python") diff --git a/metaflow/plugins/pypi/pip.py b/metaflow/plugins/pypi/pip.py index 5e97f0edd1..2723308e33 100644 --- a/metaflow/plugins/pypi/pip.py +++ b/metaflow/plugins/pypi/pip.py @@ -1,9 +1,11 @@ +import functools import json import os import re import shutil import subprocess import tempfile +import time from concurrent.futures import ThreadPoolExecutor from itertools import chain, product from urllib.parse import unquote @@ -50,12 +52,25 @@ def __init__(self, error): class Pip(object): - def __init__(self, micromamba=None): + def __init__(self, micromamba=None, logger=None): # pip is assumed to be installed inside a conda environment managed by # micromamba. pip commands are executed using `micromamba run --prefix` - self.micromamba = micromamba or Micromamba() + self.micromamba = micromamba or Micromamba(logger) + if logger: + self.logger = functools.partial( + logger, + fg="yellow", + bold=False, + indent=True, + overwrite=True, + animate=True, + ) + else: + self.logger = lambda *args, **kwargs: None # No-op logger if not provided def solve(self, id_, packages, python, platform): + self.logger(f"Solving {platform} PyPI packages for {id_} ...") + start_time = time.time() prefix = self.micromamba.path_to_environment(id_) if prefix is None: msg = "Unable to locate a Micromamba managed virtual environment\n" @@ -130,6 +145,9 @@ def _format(dl_info): res["hash"] = vcs_info["commit_id"] return res + self.logger( + f"Solved {platform} PyPI packages for {id_} in {time.time() - start_time:.2f}s!" + ) with open(report, mode="r", encoding="utf-8") as f: return [ _format(item["download_info"]) for item in json.load(f)["install"] @@ -142,6 +160,8 @@ def download(self, id_, packages, python, platform): if os.path.isfile(metadata_file): return + self.logger(f"Downloading {platform} PyPI packages for {id_} ...") + start_time = time.time() metadata = {} custom_index_url, extra_index_urls = self.indices(prefix) @@ -231,6 +251,9 @@ def _build(key, package): prefix=prefix, wheel=unquote(package["url"].split("/")[-1]) ) self._call(prefix, cmd) + self.logger( + f"Downloaded {platform} PyPI packages for {id_} in {time.time() - start_time:.2f}s!" + ) # write the url to wheel mappings in a magic location with open(metadata_file, "w") as file: file.write(json.dumps(metadata)) @@ -245,6 +268,8 @@ def create(self, id_, packages, python, platform): # Pip can't install packages if the underlying virtual environment doesn't # share the same platform if self.micromamba.platform() == platform: + self.logger(f"Installing {platform} PyPI packages for {id_} ...") + start_time = time.time() cmd = [ "install", "--no-compile", @@ -256,6 +281,9 @@ def create(self, id_, packages, python, platform): for package in packages: cmd.append(metadata[package["url"]]) self._call(prefix, cmd) + self.logger( + f"Installed {platform} PyPI packages for {id_} in {time.time() - start_time:.2f}s!" + ) with open(installation_marker, "w") as file: file.write(json.dumps({"id": id_}))