From 40f4b0fb858e1ae4f49188207917e1a9d2b80442 Mon Sep 17 00:00:00 2001 From: savin Date: Sat, 5 Oct 2024 14:11:43 -0700 Subject: [PATCH 1/7] speed up remote bootstrap for @pypi/@conda --- metaflow/plugins/pypi/bootstrap.py | 191 ++++++++++++++------- metaflow/plugins/pypi/conda_environment.py | 2 +- metaflow/plugins/pypi/micromamba.py | 1 + 3 files changed, 129 insertions(+), 65 deletions(-) diff --git a/metaflow/plugins/pypi/bootstrap.py b/metaflow/plugins/pypi/bootstrap.py index 828bb050878..cbf143d3fa3 100644 --- a/metaflow/plugins/pypi/bootstrap.py +++ b/metaflow/plugins/pypi/bootstrap.py @@ -1,4 +1,5 @@ import bz2 +import concurrent.futures import io import json import os @@ -6,6 +7,9 @@ import subprocess import sys import tarfile +import time + +import requests from metaflow.metaflow_config import DATASTORE_LOCAL_DIR from metaflow.plugins import DATASTORES @@ -15,7 +19,14 @@ # Bootstraps a valid conda virtual environment composed of conda and pypi packages + +def print_timer(operation, start_time): + duration = time.time() - start_time + print(f"Time taken for {operation}: {duration:.2f} seconds") + + if __name__ == "__main__": + total_start_time = time.time() if len(sys.argv) != 5: print("Usage: bootstrap.py ") sys.exit(1) @@ -47,6 +58,8 @@ prefix = os.path.join(os.getcwd(), architecture, id_) pkgs_dir = os.path.join(os.getcwd(), ".pkgs") + conda_pkgs_dir = os.path.join(pkgs_dir, "conda") + pypi_pkgs_dir = os.path.join(pkgs_dir, "pypi") manifest_dir = os.path.join(os.getcwd(), DATASTORE_LOCAL_DIR, flow_name) datastores = [d for d in DATASTORES if d.TYPE == datastore_type] @@ -64,77 +77,127 @@ os.path.join(os.getcwd(), MAGIC_FILE), os.path.join(manifest_dir, MAGIC_FILE), ) - with open(os.path.join(manifest_dir, MAGIC_FILE)) as f: env = json.load(f)[id_][architecture] - # Download Conda packages. - conda_pkgs_dir = os.path.join(pkgs_dir, "conda") - with storage.load_bytes([package["path"] for package in env["conda"]]) as results: - for key, tmpfile, _ in results: - # Ensure that conda packages go into architecture specific folders. - # The path looks like REPO/CHANNEL/CONDA_SUBDIR/PACKAGE. We trick - # Micromamba into believing that all packages are coming from a local - # channel - the only hurdle is ensuring that packages are organised - # properly. - - # TODO: consider RAM disk - dest = os.path.join(conda_pkgs_dir, "/".join(key.split("/")[-2:])) - os.makedirs(os.path.dirname(dest), exist_ok=True) - shutil.move(tmpfile, dest) - - # Create Conda environment. - cmds = [ + def run_cmd(cmd): + cmd_start_time = time.time() + result = subprocess.run( + cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True + ) + print_timer(f"Command: {cmd}", cmd_start_time) + if result.returncode != 0: + print(f"Bootstrap failed while executing: {cmd}") + print("Stdout:", result.stdout) + print("Stderr:", result.stderr) + sys.exit(1) + + def install_micromamba(architecture): # TODO: check if mamba or conda are already available on the image - # TODO: micromamba installation can be pawned off to micromamba.py - f"""set -e; - if ! command -v micromamba >/dev/null 2>&1; then - mkdir -p micromamba; - python -c "import requests, bz2, sys; data = requests.get('https://micro.mamba.pm/api/micromamba/{architecture}/1.5.7').content; sys.stdout.buffer.write(bz2.decompress(data))" | tar -xv -C $(pwd)/micromamba bin/micromamba --strip-components 1; - export PATH=$PATH:$(pwd)/micromamba; - if ! command -v micromamba >/dev/null 2>&1; then - echo "Failed to install Micromamba!"; - exit 1; - fi; - fi""", - # Create a conda environment through Micromamba. - f'''set -e; - tmpfile=$(mktemp); - echo "@EXPLICIT" > "$tmpfile"; - ls -d {conda_pkgs_dir}/*/* >> "$tmpfile"; - export PATH=$PATH:$(pwd)/micromamba; - export CONDA_PKGS_DIRS=$(pwd)/micromamba/pkgs; - micromamba create --yes --offline --no-deps --safety-checks=disabled --no-extra-safety-checks --prefix {prefix} --file "$tmpfile"; - rm "$tmpfile"''', - ] - - # Download PyPI packages. - if "pypi" in env: - pypi_pkgs_dir = os.path.join(pkgs_dir, "pypi") - with storage.load_bytes( - [package["path"] for package in env["pypi"]] - ) as results: + micromamba_timer = time.time() + micromamba_dir = os.path.join(os.getcwd(), "micromamba") + micromamba_path = os.path.join(micromamba_dir, "bin", "micromamba") + + if which("micromamba") or os.path.exists(micromamba_path): + return micromamba_path + + os.makedirs(micromamba_dir, exist_ok=True) + # TODO: download micromamba from datastore + url = f"https://micro.mamba.pm/api/micromamba/{architecture}/1.5.7" + response = requests.get(url, stream=True) + if response.status_code != 200: + raise Exception( + f"Failed to download micromamba: HTTP {response.status_code}" + ) + tar_content = bz2.BZ2Decompressor().decompress(response.raw.read()) + with tarfile.open(fileobj=io.BytesIO(tar_content), mode="r:") as tar: + tar.extract("bin/micromamba", path=micromamba_dir, set_attrs=False) + + os.chmod(micromamba_path, 0o755) + if not os.path.exists(micromamba_path): + raise Exception("Failed to install Micromamba!") + + os.environ["PATH"] += os.pathsep + os.path.dirname(micromamba_path) + print_timer("Downloading micromamba", micromamba_timer) + return micromamba_path + + def download_conda_packages(storage, packages, dest_dir): + download_start_time = time.time() + os.makedirs(dest_dir, exist_ok=True) + with storage.load_bytes([package["path"] for package in packages]) as results: for key, tmpfile, _ in results: - dest = os.path.join(pypi_pkgs_dir, os.path.basename(key)) + # Ensure that conda packages go into architecture specific folders. + # The path looks like REPO/CHANNEL/CONDA_SUBDIR/PACKAGE. We trick + # Micromamba into believing that all packages are coming from a local + # channel - the only hurdle is ensuring that packages are organised + # properly. + + # TODO: consider RAM disk + dest = os.path.join(dest_dir, "/".join(key.split("/")[-2:])) os.makedirs(os.path.dirname(dest), exist_ok=True) shutil.move(tmpfile, dest) + print_timer("Downloading conda packages", download_start_time) + return dest_dir - # Install PyPI packages. - cmds.extend( - [ - f"""set -e; - export PATH=$PATH:$(pwd)/micromamba; - export CONDA_PKGS_DIRS=$(pwd)/micromamba/pkgs; - micromamba run --prefix {prefix} python -m pip --disable-pip-version-check install --root-user-action=ignore --no-compile {pypi_pkgs_dir}/*.whl --no-user""" - ] - ) + def download_pypi_packages(storage, packages, dest_dir): + download_start_time = time.time() + os.makedirs(dest_dir, exist_ok=True) + with storage.load_bytes([package["path"] for package in packages]) as results: + for key, tmpfile, _ in results: + dest = os.path.join(dest_dir, os.path.basename(key)) + shutil.move(tmpfile, dest) + print_timer("Downloading pypi packages", download_start_time) + return dest_dir + + def create_conda_environment(prefix, conda_pkgs_dir): + cmd = f'''set -e; + tmpfile=$(mktemp); + echo "@EXPLICIT" > "$tmpfile"; + ls -d {conda_pkgs_dir}/*/* >> "$tmpfile"; + export PATH=$PATH:$(pwd)/micromamba; + export CONDA_PKGS_DIRS=$(pwd)/micromamba/pkgs; + micromamba create --yes --offline --no-deps --safety-checks=disabled --no-extra-safety-checks --prefix {prefix} --file "$tmpfile"; + rm "$tmpfile"''' + run_cmd(cmd) - for cmd in cmds: - result = subprocess.run( - cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE + def install_pypi_packages(prefix, pypi_pkgs_dir): + cmd = f"""set -e; + export PATH=$PATH:$(pwd)/micromamba; + export CONDA_PKGS_DIRS=$(pwd)/micromamba/pkgs; + micromamba run --prefix {prefix} python -m pip --disable-pip-version-check install --root-user-action=ignore --no-compile --no-index --no-cache-dir --no-deps --prefer-binary --find-links={pypi_pkgs_dir} {pypi_pkgs_dir}/*.whl --no-user""" + run_cmd(cmd) + + with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: + # install micromamba, download conda and pypi packages in parallel + future_install_micromamba = executor.submit(install_micromamba, architecture) + future_download_conda_packages = executor.submit( + download_conda_packages, storage, env["conda"], conda_pkgs_dir ) - if result.returncode != 0: - print(f"Bootstrap failed while executing: {cmd}") - print("Stdout:", result.stdout.decode()) - print("Stderr:", result.stderr.decode()) - sys.exit(1) + future_download_pypi_packages = None + if "pypi" in env: + future_download_pypi_packages = executor.submit( + download_pypi_packages, storage, env["pypi"], pypi_pkgs_dir + ) + # create conda environment after micromamba is installed and conda packages are downloaded + concurrent.futures.wait( + [future_install_micromamba, future_download_conda_packages] + ) + future_create_conda_environment = executor.submit( + create_conda_environment, prefix, conda_pkgs_dir + ) + if "pypi" in env: + # install pypi packages after conda environment is created and pypi packages are downloaded + concurrent.futures.wait( + [future_create_conda_environment, future_download_pypi_packages] + ) + future_install_pypi_packages = executor.submit( + install_pypi_packages, prefix, pypi_pkgs_dir + ) + # wait for pypi packages to be installed + future_install_pypi_packages.result() + else: + # wait for conda environment to be created + future_create_conda_environment.result() + + total_time = time.time() - total_start_time + print(f"{total_time:.2f}") diff --git a/metaflow/plugins/pypi/conda_environment.py b/metaflow/plugins/pypi/conda_environment.py index 1ce0ea6b113..7bba75383e0 100644 --- a/metaflow/plugins/pypi/conda_environment.py +++ b/metaflow/plugins/pypi/conda_environment.py @@ -374,7 +374,7 @@ def bootstrap_commands(self, step_name, datastore_type): 'DISABLE_TRACING=True python -m metaflow.plugins.pypi.bootstrap "%s" %s "%s" linux-64' % (self.flow.name, id_, self.datastore_type), "echo 'Environment bootstrapped.'", - "export PATH=$PATH:$(pwd)/micromamba", + "export PATH=$PATH:$(pwd)/micromamba/bin", ] else: # for @conda/@pypi(disabled=True). diff --git a/metaflow/plugins/pypi/micromamba.py b/metaflow/plugins/pypi/micromamba.py index 378d3d5993a..690760debdc 100644 --- a/metaflow/plugins/pypi/micromamba.py +++ b/metaflow/plugins/pypi/micromamba.py @@ -33,6 +33,7 @@ def __init__(self): os.path.expanduser(_home), "micromamba", ) + self.bin = ( which(os.environ.get("METAFLOW_PATH_TO_MICROMAMBA") or "micromamba") or which("./micromamba") # to support remote execution From ef81307e193bb241f08e7994823948981f8b8c8c Mon Sep 17 00:00:00 2001 From: savin Date: Sat, 5 Oct 2024 14:16:05 -0700 Subject: [PATCH 2/7] remove print statements --- metaflow/plugins/pypi/bootstrap.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/metaflow/plugins/pypi/bootstrap.py b/metaflow/plugins/pypi/bootstrap.py index cbf143d3fa3..c675a35caa6 100644 --- a/metaflow/plugins/pypi/bootstrap.py +++ b/metaflow/plugins/pypi/bootstrap.py @@ -26,7 +26,6 @@ def print_timer(operation, start_time): if __name__ == "__main__": - total_start_time = time.time() if len(sys.argv) != 5: print("Usage: bootstrap.py ") sys.exit(1) @@ -81,11 +80,9 @@ def print_timer(operation, start_time): env = json.load(f)[id_][architecture] def run_cmd(cmd): - cmd_start_time = time.time() result = subprocess.run( cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) - print_timer(f"Command: {cmd}", cmd_start_time) if result.returncode != 0: print(f"Bootstrap failed while executing: {cmd}") print("Stdout:", result.stdout) @@ -94,7 +91,6 @@ def run_cmd(cmd): def install_micromamba(architecture): # TODO: check if mamba or conda are already available on the image - micromamba_timer = time.time() micromamba_dir = os.path.join(os.getcwd(), "micromamba") micromamba_path = os.path.join(micromamba_dir, "bin", "micromamba") @@ -118,11 +114,9 @@ def install_micromamba(architecture): raise Exception("Failed to install Micromamba!") os.environ["PATH"] += os.pathsep + os.path.dirname(micromamba_path) - print_timer("Downloading micromamba", micromamba_timer) return micromamba_path def download_conda_packages(storage, packages, dest_dir): - download_start_time = time.time() os.makedirs(dest_dir, exist_ok=True) with storage.load_bytes([package["path"] for package in packages]) as results: for key, tmpfile, _ in results: @@ -136,17 +130,14 @@ def download_conda_packages(storage, packages, dest_dir): dest = os.path.join(dest_dir, "/".join(key.split("/")[-2:])) os.makedirs(os.path.dirname(dest), exist_ok=True) shutil.move(tmpfile, dest) - print_timer("Downloading conda packages", download_start_time) return dest_dir def download_pypi_packages(storage, packages, dest_dir): - download_start_time = time.time() os.makedirs(dest_dir, exist_ok=True) with storage.load_bytes([package["path"] for package in packages]) as results: for key, tmpfile, _ in results: dest = os.path.join(dest_dir, os.path.basename(key)) shutil.move(tmpfile, dest) - print_timer("Downloading pypi packages", download_start_time) return dest_dir def create_conda_environment(prefix, conda_pkgs_dir): @@ -198,6 +189,3 @@ def install_pypi_packages(prefix, pypi_pkgs_dir): else: # wait for conda environment to be created future_create_conda_environment.result() - - total_time = time.time() - total_start_time - print(f"{total_time:.2f}") From b87028014e1b00f2b20288a33c678e1276e74e3e Mon Sep 17 00:00:00 2001 From: savin Date: Sat, 5 Oct 2024 16:27:21 -0700 Subject: [PATCH 3/7] more speed ups --- metaflow/plugins/pypi/bootstrap.py | 71 ++++++++++++++++------ metaflow/plugins/pypi/conda_environment.py | 7 ++- metaflow/plugins/pypi/micromamba.py | 46 ++++++++------ 3 files changed, 86 insertions(+), 38 deletions(-) diff --git a/metaflow/plugins/pypi/bootstrap.py b/metaflow/plugins/pypi/bootstrap.py index c675a35caa6..b6736cdd79e 100644 --- a/metaflow/plugins/pypi/bootstrap.py +++ b/metaflow/plugins/pypi/bootstrap.py @@ -20,9 +20,15 @@ # Bootstraps a valid conda virtual environment composed of conda and pypi packages -def print_timer(operation, start_time): - duration = time.time() - start_time - print(f"Time taken for {operation}: {duration:.2f} seconds") +def timer(func): + def wrapper(*args, **kwargs): + start_time = time.time() + result = func(*args, **kwargs) + duration = time.time() - start_time + # print(f"Time taken for {func.__name__}: {duration:.2f} seconds") + return result + + return wrapper if __name__ == "__main__": @@ -89,6 +95,7 @@ def run_cmd(cmd): print("Stderr:", result.stderr) sys.exit(1) + @timer def install_micromamba(architecture): # TODO: check if mamba or conda are already available on the image micromamba_dir = os.path.join(os.getcwd(), "micromamba") @@ -116,30 +123,56 @@ def install_micromamba(architecture): os.environ["PATH"] += os.pathsep + os.path.dirname(micromamba_path) return micromamba_path + @timer def download_conda_packages(storage, packages, dest_dir): + + def process_conda_package(args): + # Ensure that conda packages go into architecture specific folders. + # The path looks like REPO/CHANNEL/CONDA_SUBDIR/PACKAGE. We trick + # Micromamba into believing that all packages are coming from a local + # channel - the only hurdle is ensuring that packages are organised + # properly. + key, tmpfile, dest_dir = args + dest = os.path.join(dest_dir, "/".join(key.split("/")[-2:])) + os.makedirs(os.path.dirname(dest), exist_ok=True) + shutil.move(tmpfile, dest) + os.makedirs(dest_dir, exist_ok=True) with storage.load_bytes([package["path"] for package in packages]) as results: - for key, tmpfile, _ in results: - # Ensure that conda packages go into architecture specific folders. - # The path looks like REPO/CHANNEL/CONDA_SUBDIR/PACKAGE. We trick - # Micromamba into believing that all packages are coming from a local - # channel - the only hurdle is ensuring that packages are organised - # properly. - - # TODO: consider RAM disk - dest = os.path.join(dest_dir, "/".join(key.split("/")[-2:])) - os.makedirs(os.path.dirname(dest), exist_ok=True) - shutil.move(tmpfile, dest) + with concurrent.futures.ThreadPoolExecutor() as executor: + executor.map( + process_conda_package, + [(key, tmpfile, dest_dir) for key, tmpfile, _ in results], + ) + # for key, tmpfile, _ in results: + + # # TODO: consider RAM disk + # dest = os.path.join(dest_dir, "/".join(key.split("/")[-2:])) + # os.makedirs(os.path.dirname(dest), exist_ok=True) + # shutil.move(tmpfile, dest) return dest_dir + @timer def download_pypi_packages(storage, packages, dest_dir): + + def process_pypi_package(args): + key, tmpfile, dest_dir = args + dest = os.path.join(dest_dir, os.path.basename(key)) + shutil.move(tmpfile, dest) + os.makedirs(dest_dir, exist_ok=True) with storage.load_bytes([package["path"] for package in packages]) as results: - for key, tmpfile, _ in results: - dest = os.path.join(dest_dir, os.path.basename(key)) - shutil.move(tmpfile, dest) + with concurrent.futures.ThreadPoolExecutor() as executor: + executor.map( + process_pypi_package, + [(key, tmpfile, dest_dir) for key, tmpfile, _ in results], + ) + # for key, tmpfile, _ in results: + # dest = os.path.join(dest_dir, os.path.basename(key)) + # shutil.move(tmpfile, dest) return dest_dir + @timer def create_conda_environment(prefix, conda_pkgs_dir): cmd = f'''set -e; tmpfile=$(mktemp); @@ -151,14 +184,16 @@ def create_conda_environment(prefix, conda_pkgs_dir): rm "$tmpfile"''' run_cmd(cmd) + @timer def install_pypi_packages(prefix, pypi_pkgs_dir): + cmd = f"""set -e; export PATH=$PATH:$(pwd)/micromamba; export CONDA_PKGS_DIRS=$(pwd)/micromamba/pkgs; micromamba run --prefix {prefix} python -m pip --disable-pip-version-check install --root-user-action=ignore --no-compile --no-index --no-cache-dir --no-deps --prefer-binary --find-links={pypi_pkgs_dir} {pypi_pkgs_dir}/*.whl --no-user""" run_cmd(cmd) - with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: + with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: # install micromamba, download conda and pypi packages in parallel future_install_micromamba = executor.submit(install_micromamba, architecture) future_download_conda_packages = executor.submit( diff --git a/metaflow/plugins/pypi/conda_environment.py b/metaflow/plugins/pypi/conda_environment.py index 7bba75383e0..01567d67168 100644 --- a/metaflow/plugins/pypi/conda_environment.py +++ b/metaflow/plugins/pypi/conda_environment.py @@ -5,9 +5,7 @@ import io import json import os -import sys import tarfile -import time from concurrent.futures import ThreadPoolExecutor from hashlib import sha256 from io import BufferedIOBase, BytesIO @@ -150,6 +148,9 @@ def _path(url, local_path): ( package["path"], # Lazily fetch package from the interweb if needed. + # TODO: Depending on the len_hint, the package might be downloaded from + # the interweb prematurely. save_bytes needs to be adjusted to handle + # this scenario. LazyOpen( package["local_path"], "rb", @@ -427,7 +428,7 @@ def write_to_environment_manifest(self, keys, value): class LazyOpen(BufferedIOBase): - def __init__(self, filename, mode="rb", url=None): + def __init__(self, filename=None, mode="rb", url=None): super().__init__() self.filename = filename self.mode = mode diff --git a/metaflow/plugins/pypi/micromamba.py b/metaflow/plugins/pypi/micromamba.py index 690760debdc..fa7d5c590af 100644 --- a/metaflow/plugins/pypi/micromamba.py +++ b/metaflow/plugins/pypi/micromamba.py @@ -1,3 +1,4 @@ +import functools import json import os import subprocess @@ -101,12 +102,6 @@ def download(self, id_, packages, python, platform): # Unfortunately all the packages need to be catalogued in package cache # because of which this function can't be parallelized - # Micromamba is painfully slow in determining if many packages are infact - # already cached. As a perf heuristic, we check if the environment already - # exists to short circuit package downloads. - if self.path_to_environment(id_, platform): - return - prefix = "{env_dirs}/{keyword}/{platform}/{id}".format( env_dirs=self.info()["envs_dirs"][0], platform=platform, @@ -114,10 +109,19 @@ def download(self, id_, packages, python, platform): id=id_, ) - # Another forced perf heuristic to skip cross-platform downloads. + # Micromamba is painfully slow in determining if many packages are infact + # already cached. As a perf heuristic, we check if the environment already + # exists to short circuit package downloads. + + # cheap check if os.path.exists(f"{prefix}/fake.done"): return + # somewhat expensive check + # TODO: make this less expensive + if self.path_to_environment(id_, platform): + return + with tempfile.TemporaryDirectory() as tmp_dir: env = { "CONDA_SUBDIR": platform, @@ -175,9 +179,11 @@ def create(self, id_, packages, python, platform): cmd.append("{url}".format(**package)) self._call(cmd, env) + @functools.lru_cache(maxsize=None) def info(self): return self._call(["config", "list", "-a"]) + @functools.lru_cache(maxsize=None) def path_to_environment(self, id_, platform=None): if platform is None: platform = self.platform() @@ -186,6 +192,7 @@ def path_to_environment(self, id_, platform=None): keyword="metaflow", # indicates metaflow generated environment id=id_, ) + # TODO: make this call less expensive for env in self._call(["env", "list"])["envs"]: # TODO: Check bin/python is available as a heuristic for well formed env if env.endswith(suffix): @@ -199,18 +206,23 @@ def metadata(self, id_, packages, python, platform): } directories = self.info()["pkgs_dirs"] # search all package caches for packages - metadata = { - url: os.path.join(d, file) + file_to_path = {} + + for d in directories: + if os.path.isdir(d): + try: + with os.scandir(d) as entries: + for entry in entries: + if entry.is_file(): + # Prefer the first occurrence if the file exists in multiple directories + file_to_path.setdefault(entry.name, entry.path) + except OSError: + continue + return { + # set package tarball local paths to None if package tarballs are missing + url: file_to_path.get(file) for url, file in packages_to_filenames.items() - for d in directories - if os.path.isdir(d) - and file in os.listdir(d) - and os.path.isfile(os.path.join(d, file)) } - # set package tarball local paths to None if package tarballs are missing - for url in packages_to_filenames: - metadata.setdefault(url, None) - return metadata def interpreter(self, id_): return os.path.join(self.path_to_environment(id_), "bin/python") From 3aedfb7b6830f7a590a9fbe47339aef568c5d122 Mon Sep 17 00:00:00 2001 From: savin Date: Sat, 5 Oct 2024 20:55:00 -0700 Subject: [PATCH 4/7] even faster --- metaflow/plugins/pypi/conda_environment.py | 69 +++++++++++++++++----- metaflow/plugins/pypi/micromamba.py | 1 - 2 files changed, 55 insertions(+), 15 deletions(-) diff --git a/metaflow/plugins/pypi/conda_environment.py b/metaflow/plugins/pypi/conda_environment.py index 01567d67168..6c1844e6787 100644 --- a/metaflow/plugins/pypi/conda_environment.py +++ b/metaflow/plugins/pypi/conda_environment.py @@ -6,7 +6,8 @@ import json import os import tarfile -from concurrent.futures import ThreadPoolExecutor +import time +from concurrent.futures import ThreadPoolExecutor, as_completed from hashlib import sha256 from io import BufferedIOBase, BytesIO from itertools import chain @@ -107,6 +108,7 @@ def solve(id_, environment, type_): ) def cache(storage, results, type_): + def _path(url, local_path): # Special handling for VCS packages if url.startswith("git+"): @@ -167,24 +169,52 @@ def _path(url, local_path): if id_ in dirty: self.write_to_environment_manifest([id_, platform, type_], packages) - # First resolve environments through Conda, before PyPI. + storage = None + if self.datastore_type not in ["local"]: + # Initialize storage for caching if using a remote datastore + storage = self.datastore(_datastore_packageroot(self.datastore, echo)) + self.logger("Bootstrapping virtual environment(s) ...") + # First resolve environments through Conda, before PyPI. for solver in ["conda", "pypi"]: with ThreadPoolExecutor() as executor: - results = list( - executor.map(lambda x: solve(*x, solver), environments(solver)) - ) - _ = list(map(lambda x: self.solvers[solver].download(*x), results)) - with ThreadPoolExecutor() as executor: - _ = list( - executor.map(lambda x: self.solvers[solver].create(*x), results) - ) - if self.datastore_type not in ["local"]: - # Cache packages only when a remote datastore is in play. - storage = self.datastore(_datastore_packageroot(self.datastore, echo)) - cache(storage, results, solver) + # parallel solves + solves = [ + executor.submit(lambda x: solve(*x, solver), env) + for env in environments(solver) + ] + for future in as_completed(solves): + result = future.result() + # sequential downloads + self.solvers[solver].download(*result) + # parallel creates + executor.submit(self.solvers[solver].create, *result) + if storage: + # parallel cache + executor.submit(cache, storage, [result], solver) + executor.shutdown(wait=True) + # _ = [future.result() for future in as_completed(creates)] + self.logger("Virtual environment(s) bootstrapped!") + # First resolve environments through Conda, before PyPI. + # self.logger("Bootstrapping virtual environment(s) ...") + # for solver in ["conda", "pypi"]: + # with ThreadPoolExecutor() as executor: + # results = list( + # executor.map(lambda x: solve(*x, solver), environments(solver)) + # ) + # _ = list(map(lambda x: self.solvers[solver].download(*x), results)) + # with ThreadPoolExecutor() as executor: + # _ = list( + # executor.map(lambda x: self.solvers[solver].create(*x), results) + # ) + # if self.datastore_type not in ["local"]: + # # Cache packages only when a remote datastore is in play. + # storage = self.datastore(_datastore_packageroot(self.datastore, echo)) + # cache(storage, results, solver) + # self.logger("Virtual environment(s) bootstrapped!") + def executable(self, step_name, default=None): step = next((step for step in self.flow if step.name == step_name), None) if step is None: @@ -427,6 +457,17 @@ def write_to_environment_manifest(self, keys, value): fcntl.flock(f, fcntl.LOCK_UN) +class Processor(object): + + def __init__(self, solver, max_workers): + self.solver = solver + self.max_workers = max_workers + self.executor = ThreadPoolExecutor(max_workers=max_workers) + + def process(self, env): + self.executor.submit(self.solver.process, env) + + class LazyOpen(BufferedIOBase): def __init__(self, filename=None, mode="rb", url=None): super().__init__() diff --git a/metaflow/plugins/pypi/micromamba.py b/metaflow/plugins/pypi/micromamba.py index fa7d5c590af..a8b6c60091b 100644 --- a/metaflow/plugins/pypi/micromamba.py +++ b/metaflow/plugins/pypi/micromamba.py @@ -101,7 +101,6 @@ def solve(self, id_, packages, python, platform): def download(self, id_, packages, python, platform): # Unfortunately all the packages need to be catalogued in package cache # because of which this function can't be parallelized - prefix = "{env_dirs}/{keyword}/{platform}/{id}".format( env_dirs=self.info()["envs_dirs"][0], platform=platform, From 6d11fff946082cd48a82feb9a7351ee0f71174d5 Mon Sep 17 00:00:00 2001 From: savin Date: Sat, 5 Oct 2024 23:18:49 -0700 Subject: [PATCH 5/7] better logs --- metaflow/cli.py | 84 +++++++++++++++++++--- metaflow/plugins/pypi/conda_environment.py | 72 ++++++++++++------- metaflow/plugins/pypi/micromamba.py | 33 ++++++++- metaflow/plugins/pypi/pip.py | 32 ++++++++- 4 files changed, 185 insertions(+), 36 deletions(-) diff --git a/metaflow/cli.py b/metaflow/cli.py index 270a1bd6216..98de5f01cf3 100644 --- a/metaflow/cli.py +++ b/metaflow/cli.py @@ -8,6 +8,7 @@ import metaflow.tracing as tracing from metaflow._vendor import click from metaflow.client.core import get_metadata +from metaflow.system import _system_logger, _system_monitor from . import decorators, lint, metaflow_version, namespace, parameters, plugins from .cli_args import cli_args @@ -24,7 +25,6 @@ DEFAULT_PACKAGE_SUFFIXES, ) from .metaflow_current import current -from metaflow.system import _system_monitor, _system_logger from .metaflow_environment import MetaflowEnvironment from .mflog import LOG_SOURCES, mflog from .package import MetaflowPackage @@ -68,20 +68,86 @@ def echo_dev_null(*args, **kwargs): pass +import shutil +import threading +import time +from itertools import cycle + +_animation_thread = None +_animation_stop = threading.Event() +_default_spinner = cycle(["-", "\\", "|", "/"]) + + +def _animate(get_frame, line, err, kwargs, indent): + while not _animation_stop.is_set(): + frame = get_frame(line) + if indent: + frame = INDENT + frame + terminal_width, _ = shutil.get_terminal_size() + frame = frame.ljust(terminal_width)[:terminal_width] + click.secho(f"\r{frame}", nl=False, err=err, **kwargs) + click.get_text_stream("stderr" if err else "stdout").flush() + time.sleep(0.1) + + def echo_always(line, **kwargs): + global _animation_thread, _animation_stop + kwargs["err"] = kwargs.get("err", True) - if kwargs.pop("indent", None): - line = "\n".join(INDENT + x for x in line.splitlines()) - if "nl" not in kwargs or kwargs["nl"]: + overwrite = kwargs.pop("overwrite", False) + animate = kwargs.pop("animate", None) + indent = kwargs.pop("indent", False) + + if _animation_thread is not None: + _animation_stop.set() + _animation_thread.join() + _animation_thread = None + _animation_stop.clear() + click.echo("\r", nl=False, err=kwargs["err"]) + + if indent: + if animate: + # For animated output, we prepend INDENT in the animation function + pass + else: + line = INDENT + line + + animation_kwargs = { + "fg": kwargs.get("fg"), + "bg": kwargs.get("bg"), + "bold": kwargs.get("bold", False), + "underline": kwargs.get("underline", False), + } + + if animate: + if animate is True: + get_frame = lambda line: f"{next(_default_spinner)} {line}" + else: + get_frame = animate + _animation_stop.clear() + _animation_thread = threading.Thread( + target=_animate, + args=(get_frame, line, kwargs["err"], animation_kwargs, indent), + ) + _animation_thread.start() + return + + if overwrite: + terminal_width, _ = shutil.get_terminal_size() + line = line.ljust(terminal_width)[:terminal_width] + click.echo("\r", nl=False, err=kwargs["err"]) + elif "nl" not in kwargs or kwargs["nl"]: line += ERASE_TO_EOL + top = kwargs.pop("padding_top", None) bottom = kwargs.pop("padding_bottom", None) highlight = kwargs.pop("highlight", HIGHLIGHT) - if top: + + if top and not overwrite: click.secho(ERASE_TO_EOL, **kwargs) hl_bold = kwargs.pop("highlight_bold", True) - nl = kwargs.pop("nl", True) + nl = kwargs.pop("nl", True) and not overwrite fg = kwargs.pop("fg", None) bold = kwargs.pop("bold", False) kwargs["nl"] = False @@ -104,8 +170,10 @@ def echo_always(line, **kwargs): if nl: kwargs["nl"] = True click.secho("", **kwargs) - if bottom: + if bottom and not overwrite: click.secho(ERASE_TO_EOL, **kwargs) + if overwrite: + click.get_text_stream("stderr" if kwargs["err"] else "stdout").flush() def logger(body="", system_msg=False, head="", bad=False, timestamp=True, nl=True): @@ -960,7 +1028,7 @@ def start( ctx.obj.environment = [ e for e in ENVIRONMENTS + [MetaflowEnvironment] if e.TYPE == environment ][0](ctx.obj.flow) - ctx.obj.environment.validate_environment(ctx.obj.logger, datastore) + ctx.obj.environment.validate_environment(echo, datastore) ctx.obj.event_logger = LOGGING_SIDECARS[event_logger]( flow=ctx.obj.flow, env=ctx.obj.environment diff --git a/metaflow/plugins/pypi/conda_environment.py b/metaflow/plugins/pypi/conda_environment.py index 6c1844e6787..edd9e2cee11 100644 --- a/metaflow/plugins/pypi/conda_environment.py +++ b/metaflow/plugins/pypi/conda_environment.py @@ -5,9 +5,13 @@ import io import json import os +import signal +import sys import tarfile +import threading import time from concurrent.futures import ThreadPoolExecutor, as_completed +from functools import wraps from hashlib import sha256 from io import BufferedIOBase, BytesIO from itertools import chain @@ -49,7 +53,7 @@ def decospecs(self): def validate_environment(self, logger, datastore_type): self.datastore_type = datastore_type - self.logger = logger + # self.logger = logger # Avoiding circular imports. from metaflow.plugins import DATASTORES @@ -61,8 +65,19 @@ def validate_environment(self, logger, datastore_type): from .micromamba import Micromamba from .pip import Pip - micromamba = Micromamba() - self.solvers = {"conda": micromamba, "pypi": Pip(micromamba)} + print_lock = threading.Lock() + + def make_thread_safe(func): + @wraps(func) + def wrapper(*args, **kwargs): + with print_lock: + return func(*args, **kwargs) + + return wrapper + + self.logger = make_thread_safe(logger) + micromamba = Micromamba(self.logger) + self.solvers = {"conda": micromamba, "pypi": Pip(micromamba, self.logger)} def init_environment(self, echo, only_steps=None): # The implementation optimizes for latency to ensure as many operations can @@ -108,6 +123,15 @@ def solve(id_, environment, type_): ) def cache(storage, results, type_): + self.logger( + "Caching packages ...", + fg="yellow", + bold=False, + indent=True, + overwrite=True, + animate=True, + ) + start_time = time.time() def _path(url, local_path): # Special handling for VCS packages @@ -167,14 +191,27 @@ def _path(url, local_path): ) for id_, packages, _, platform in results: if id_ in dirty: + cached = True self.write_to_environment_manifest([id_, platform, type_], packages) + if cached: + self.logger( + f"Cached packages in {time.time() - start_time:.2f}s!", + fg="yellow", + bold=False, + indent=True, + overwrite=True, + animate=True, + ) storage = None if self.datastore_type not in ["local"]: # Initialize storage for caching if using a remote datastore storage = self.datastore(_datastore_packageroot(self.datastore, echo)) - self.logger("Bootstrapping virtual environment(s) ...") + self.logger( + "Bootstrapping virtual environment(s) ...", fg="magenta", bold=False + ) + start_time = time.time() # First resolve environments through Conda, before PyPI. for solver in ["conda", "pypi"]: with ThreadPoolExecutor() as executor: @@ -193,27 +230,14 @@ def _path(url, local_path): # parallel cache executor.submit(cache, storage, [result], solver) executor.shutdown(wait=True) - # _ = [future.result() for future in as_completed(creates)] - - self.logger("Virtual environment(s) bootstrapped!") - # First resolve environments through Conda, before PyPI. - # self.logger("Bootstrapping virtual environment(s) ...") - # for solver in ["conda", "pypi"]: - # with ThreadPoolExecutor() as executor: - # results = list( - # executor.map(lambda x: solve(*x, solver), environments(solver)) - # ) - # _ = list(map(lambda x: self.solvers[solver].download(*x), results)) - # with ThreadPoolExecutor() as executor: - # _ = list( - # executor.map(lambda x: self.solvers[solver].create(*x), results) - # ) - # if self.datastore_type not in ["local"]: - # # Cache packages only when a remote datastore is in play. - # storage = self.datastore(_datastore_packageroot(self.datastore, echo)) - # cache(storage, results, solver) - # self.logger("Virtual environment(s) bootstrapped!") + self.logger( + f"Virtual environment(s) bootstrapped in {time.time() - start_time:.2f}s!", + fg="green", + bold=True, + indent=True, + overwrite=True, + ) def executable(self, step_name, default=None): step = next((step for step in self.flow if step.name == step_name), None) diff --git a/metaflow/plugins/pypi/micromamba.py b/metaflow/plugins/pypi/micromamba.py index a8b6c60091b..348c0d30395 100644 --- a/metaflow/plugins/pypi/micromamba.py +++ b/metaflow/plugins/pypi/micromamba.py @@ -3,6 +3,7 @@ import os import subprocess import tempfile +import time from metaflow.exception import MetaflowException from metaflow.util import which @@ -21,7 +22,7 @@ def __init__(self, error): class Micromamba(object): - def __init__(self): + def __init__(self, logger=None): # micromamba is a tiny version of the mamba package manager and comes with # metaflow specific performance enhancements. @@ -35,6 +36,18 @@ def __init__(self): "micromamba", ) + if logger: + self.logger = functools.partial( + logger, + fg="yellow", + bold=False, + indent=True, + overwrite=True, + animate=True, + ) + else: + self.logger = lambda *args, **kwargs: None # No-op logger if not provided + self.bin = ( which(os.environ.get("METAFLOW_PATH_TO_MICROMAMBA") or "micromamba") or which("./micromamba") # to support remote execution @@ -67,6 +80,8 @@ def solve(self, id_, packages, python, platform): # environment # 4. Multiple solves can progress at the same time while relying on the same # index + self.logger(f"Solving {platform} virtual environment {id_} ...") + start_time = time.time() with tempfile.TemporaryDirectory() as tmp_dir: env = { "MAMBA_ADD_PIP_AS_PYTHON_DEPENDENCY": "true", @@ -93,10 +108,14 @@ def solve(self, id_, packages, python, platform): cmd.append("python==%s" % python) # TODO: Ensure a human readable message is returned when the environment # can't be resolved for any and all reasons. - return [ + solve = [ {k: v for k, v in item.items() if k in ["url"]} for item in self._call(cmd, env)["actions"]["LINK"] ] + self.logger( + f"Solved {platform} virtual environment {id_} in {time.time() - start_time:.2f}s!" + ) + return solve def download(self, id_, packages, python, platform): # Unfortunately all the packages need to be catalogued in package cache @@ -121,6 +140,8 @@ def download(self, id_, packages, python, platform): if self.path_to_environment(id_, platform): return + self.logger(f"Downloading {platform} virtual environment {id_} ...") + start_time = time.time() with tempfile.TemporaryDirectory() as tmp_dir: env = { "CONDA_SUBDIR": platform, @@ -140,6 +161,9 @@ def download(self, id_, packages, python, platform): cmd.append("{url}".format(**package)) self._call(cmd, env) + self.logger( + f"Downloaded {platform} virtual environment {id_} in {time.time() - start_time:.2f}s!" + ) # Perf optimization to skip cross-platform downloads. if platform != self.platform(): os.makedirs(prefix, exist_ok=True) or open( @@ -152,6 +176,8 @@ def create(self, id_, packages, python, platform): if platform != self.platform() or self.path_to_environment(id_, platform): return + self.logger(f"Creating {platform} virtual environment {id_} ...") + start_time = time.time() prefix = "{env_dirs}/{keyword}/{platform}/{id}".format( env_dirs=self.info()["envs_dirs"][0], platform=platform, @@ -177,6 +203,9 @@ def create(self, id_, packages, python, platform): for package in packages: cmd.append("{url}".format(**package)) self._call(cmd, env) + self.logger( + f"Created {platform} virtual environment {id_} in {time.time() - start_time:.2f}s!" + ) @functools.lru_cache(maxsize=None) def info(self): diff --git a/metaflow/plugins/pypi/pip.py b/metaflow/plugins/pypi/pip.py index 5e97f0edd19..2723308e33b 100644 --- a/metaflow/plugins/pypi/pip.py +++ b/metaflow/plugins/pypi/pip.py @@ -1,9 +1,11 @@ +import functools import json import os import re import shutil import subprocess import tempfile +import time from concurrent.futures import ThreadPoolExecutor from itertools import chain, product from urllib.parse import unquote @@ -50,12 +52,25 @@ def __init__(self, error): class Pip(object): - def __init__(self, micromamba=None): + def __init__(self, micromamba=None, logger=None): # pip is assumed to be installed inside a conda environment managed by # micromamba. pip commands are executed using `micromamba run --prefix` - self.micromamba = micromamba or Micromamba() + self.micromamba = micromamba or Micromamba(logger) + if logger: + self.logger = functools.partial( + logger, + fg="yellow", + bold=False, + indent=True, + overwrite=True, + animate=True, + ) + else: + self.logger = lambda *args, **kwargs: None # No-op logger if not provided def solve(self, id_, packages, python, platform): + self.logger(f"Solving {platform} PyPI packages for {id_} ...") + start_time = time.time() prefix = self.micromamba.path_to_environment(id_) if prefix is None: msg = "Unable to locate a Micromamba managed virtual environment\n" @@ -130,6 +145,9 @@ def _format(dl_info): res["hash"] = vcs_info["commit_id"] return res + self.logger( + f"Solved {platform} PyPI packages for {id_} in {time.time() - start_time:.2f}s!" + ) with open(report, mode="r", encoding="utf-8") as f: return [ _format(item["download_info"]) for item in json.load(f)["install"] @@ -142,6 +160,8 @@ def download(self, id_, packages, python, platform): if os.path.isfile(metadata_file): return + self.logger(f"Downloading {platform} PyPI packages for {id_} ...") + start_time = time.time() metadata = {} custom_index_url, extra_index_urls = self.indices(prefix) @@ -231,6 +251,9 @@ def _build(key, package): prefix=prefix, wheel=unquote(package["url"].split("/")[-1]) ) self._call(prefix, cmd) + self.logger( + f"Downloaded {platform} PyPI packages for {id_} in {time.time() - start_time:.2f}s!" + ) # write the url to wheel mappings in a magic location with open(metadata_file, "w") as file: file.write(json.dumps(metadata)) @@ -245,6 +268,8 @@ def create(self, id_, packages, python, platform): # Pip can't install packages if the underlying virtual environment doesn't # share the same platform if self.micromamba.platform() == platform: + self.logger(f"Installing {platform} PyPI packages for {id_} ...") + start_time = time.time() cmd = [ "install", "--no-compile", @@ -256,6 +281,9 @@ def create(self, id_, packages, python, platform): for package in packages: cmd.append(metadata[package["url"]]) self._call(prefix, cmd) + self.logger( + f"Installed {platform} PyPI packages for {id_} in {time.time() - start_time:.2f}s!" + ) with open(installation_marker, "w") as file: file.write(json.dumps({"id": id_})) From 8c90b87b58563ea851d67f20b8ab1ece04ace5b9 Mon Sep 17 00:00:00 2001 From: savin Date: Sat, 5 Oct 2024 23:22:17 -0700 Subject: [PATCH 6/7] better logs --- metaflow/plugins/pypi/conda_environment.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/metaflow/plugins/pypi/conda_environment.py b/metaflow/plugins/pypi/conda_environment.py index edd9e2cee11..8df1dce30b8 100644 --- a/metaflow/plugins/pypi/conda_environment.py +++ b/metaflow/plugins/pypi/conda_environment.py @@ -231,8 +231,9 @@ def _path(url, local_path): executor.submit(cache, storage, [result], solver) executor.shutdown(wait=True) + elapsed = time.time() - start_time self.logger( - f"Virtual environment(s) bootstrapped in {time.time() - start_time:.2f}s!", + f"Virtual environment(s) bootstrapped{f' in {elapsed:.2f}s' if elapsed >= 1 else ''}!", fg="green", bold=True, indent=True, From 148f567be165a2f678618c06484539b96fa16727 Mon Sep 17 00:00:00 2001 From: savin Date: Sat, 5 Oct 2024 23:23:56 -0700 Subject: [PATCH 7/7] cleanup --- metaflow/cli.py | 9 ++++----- metaflow/plugins/pypi/conda_environment.py | 2 -- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/metaflow/cli.py b/metaflow/cli.py index 98de5f01cf3..e87b1a602a5 100644 --- a/metaflow/cli.py +++ b/metaflow/cli.py @@ -1,9 +1,13 @@ import inspect import json +import shutil import sys +import threading +import time import traceback from datetime import datetime from functools import wraps +from itertools import cycle import metaflow.tracing as tracing from metaflow._vendor import click @@ -68,11 +72,6 @@ def echo_dev_null(*args, **kwargs): pass -import shutil -import threading -import time -from itertools import cycle - _animation_thread = None _animation_stop = threading.Event() _default_spinner = cycle(["-", "\\", "|", "/"]) diff --git a/metaflow/plugins/pypi/conda_environment.py b/metaflow/plugins/pypi/conda_environment.py index 8df1dce30b8..df3f754b105 100644 --- a/metaflow/plugins/pypi/conda_environment.py +++ b/metaflow/plugins/pypi/conda_environment.py @@ -5,8 +5,6 @@ import io import json import os -import signal -import sys import tarfile import threading import time