Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better logs for @conda/@pypi #2080

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 75 additions & 8 deletions metaflow/cli.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
import inspect
import json
import shutil
import sys
import threading
import time
import traceback
from datetime import datetime
from functools import wraps
from itertools import cycle

import metaflow.tracing as tracing
from metaflow._vendor import click
from metaflow.client.core import get_metadata
from metaflow.system import _system_logger, _system_monitor

from . import decorators, lint, metaflow_version, namespace, parameters, plugins
from .cli_args import cli_args
Expand All @@ -24,7 +29,6 @@
DEFAULT_PACKAGE_SUFFIXES,
)
from .metaflow_current import current
from metaflow.system import _system_monitor, _system_logger
from .metaflow_environment import MetaflowEnvironment
from .mflog import LOG_SOURCES, mflog
from .package import MetaflowPackage
Expand Down Expand Up @@ -68,20 +72,81 @@ def echo_dev_null(*args, **kwargs):
pass


_animation_thread = None
_animation_stop = threading.Event()
_default_spinner = cycle(["-", "\\", "|", "/"])


def _animate(get_frame, line, err, kwargs, indent):
while not _animation_stop.is_set():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't seem to respect the same arguments as the non animate case.

frame = get_frame(line)
if indent:
frame = INDENT + frame
terminal_width, _ = shutil.get_terminal_size()
frame = frame.ljust(terminal_width)[:terminal_width]
click.secho(f"\r{frame}", nl=False, err=err, **kwargs)
click.get_text_stream("stderr" if err else "stdout").flush()
time.sleep(0.1)


def echo_always(line, **kwargs):
global _animation_thread, _animation_stop

kwargs["err"] = kwargs.get("err", True)
if kwargs.pop("indent", None):
line = "\n".join(INDENT + x for x in line.splitlines())
if "nl" not in kwargs or kwargs["nl"]:
overwrite = kwargs.pop("overwrite", False)
animate = kwargs.pop("animate", None)
indent = kwargs.pop("indent", False)

if _animation_thread is not None:
_animation_stop.set()
_animation_thread.join()
_animation_thread = None
_animation_stop.clear()
click.echo("\r", nl=False, err=kwargs["err"])

if indent:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

simplify: if indent and not animate

if animate:
# For animated output, we prepend INDENT in the animation function
pass
else:
line = INDENT + line

animation_kwargs = {
"fg": kwargs.get("fg"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fallbacks needed to be safe.

"bg": kwargs.get("bg"),
"bold": kwargs.get("bold", False),
"underline": kwargs.get("underline", False),
}

if animate:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check how animate and overwrite shows up in runtime logs and notebooks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes please!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

notebooks seem broken still, animate produces new lines for each frame, even though the \r echoing by itself works outside of Metaflow runtime.

if animate is True:
get_frame = lambda line: f"{next(_default_spinner)} {line}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

f-string not supported in 3.5

else:
get_frame = animate
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confused here -- what is animate? It doesn't seem to be a simple bool here so a bit confused.

_animation_stop.clear()
_animation_thread = threading.Thread(
target=_animate,
args=(get_frame, line, kwargs["err"], animation_kwargs, indent),
)
_animation_thread.start()
return

if overwrite:
terminal_width, _ = shutil.get_terminal_size()
line = line.ljust(terminal_width)[:terminal_width]
click.echo("\r", nl=False, err=kwargs["err"])
elif "nl" not in kwargs or kwargs["nl"]:
line += ERASE_TO_EOL

top = kwargs.pop("padding_top", None)
bottom = kwargs.pop("padding_bottom", None)
highlight = kwargs.pop("highlight", HIGHLIGHT)
if top:

if top and not overwrite:
click.secho(ERASE_TO_EOL, **kwargs)

hl_bold = kwargs.pop("highlight_bold", True)
nl = kwargs.pop("nl", True)
nl = kwargs.pop("nl", True) and not overwrite
fg = kwargs.pop("fg", None)
bold = kwargs.pop("bold", False)
kwargs["nl"] = False
Expand All @@ -104,8 +169,10 @@ def echo_always(line, **kwargs):
if nl:
kwargs["nl"] = True
click.secho("", **kwargs)
if bottom:
if bottom and not overwrite:
click.secho(ERASE_TO_EOL, **kwargs)
if overwrite:
click.get_text_stream("stderr" if kwargs["err"] else "stdout").flush()


def logger(body="", system_msg=False, head="", bad=False, timestamp=True, nl=True):
Expand Down Expand Up @@ -960,7 +1027,7 @@ def start(
ctx.obj.environment = [
e for e in ENVIRONMENTS + [MetaflowEnvironment] if e.TYPE == environment
][0](ctx.obj.flow)
ctx.obj.environment.validate_environment(ctx.obj.logger, datastore)
ctx.obj.environment.validate_environment(echo, datastore)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

take care of symmetric changes for fast bakery


ctx.obj.event_logger = LOGGING_SIDECARS[event_logger](
flow=ctx.obj.flow, env=ctx.obj.environment
Expand Down
206 changes: 146 additions & 60 deletions metaflow/plugins/pypi/bootstrap.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import bz2
import concurrent.futures
import io
import json
import os
import shutil
import subprocess
import sys
import tarfile
import time

import requests

from metaflow.metaflow_config import DATASTORE_LOCAL_DIR
from metaflow.plugins import DATASTORES
Expand All @@ -15,6 +19,18 @@

# Bootstraps a valid conda virtual environment composed of conda and pypi packages


def timer(func):
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
duration = time.time() - start_time
# print(f"Time taken for {func.__name__}: {duration:.2f} seconds")
return result

return wrapper


if __name__ == "__main__":
if len(sys.argv) != 5:
print("Usage: bootstrap.py <flow_name> <id> <datastore_type> <architecture>")
Expand Down Expand Up @@ -47,6 +63,8 @@

prefix = os.path.join(os.getcwd(), architecture, id_)
pkgs_dir = os.path.join(os.getcwd(), ".pkgs")
conda_pkgs_dir = os.path.join(pkgs_dir, "conda")
pypi_pkgs_dir = os.path.join(pkgs_dir, "pypi")
manifest_dir = os.path.join(os.getcwd(), DATASTORE_LOCAL_DIR, flow_name)

datastores = [d for d in DATASTORES if d.TYPE == datastore_type]
Expand All @@ -64,77 +82,145 @@
os.path.join(os.getcwd(), MAGIC_FILE),
os.path.join(manifest_dir, MAGIC_FILE),
)

with open(os.path.join(manifest_dir, MAGIC_FILE)) as f:
env = json.load(f)[id_][architecture]

# Download Conda packages.
conda_pkgs_dir = os.path.join(pkgs_dir, "conda")
with storage.load_bytes([package["path"] for package in env["conda"]]) as results:
for key, tmpfile, _ in results:
def run_cmd(cmd):
result = subprocess.run(
cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
if result.returncode != 0:
print(f"Bootstrap failed while executing: {cmd}")
print("Stdout:", result.stdout)
print("Stderr:", result.stderr)
sys.exit(1)

@timer
def install_micromamba(architecture):
# TODO: check if mamba or conda are already available on the image
micromamba_dir = os.path.join(os.getcwd(), "micromamba")
micromamba_path = os.path.join(micromamba_dir, "bin", "micromamba")

if which("micromamba") or os.path.exists(micromamba_path):
return micromamba_path

os.makedirs(micromamba_dir, exist_ok=True)
# TODO: download micromamba from datastore
url = f"https://micro.mamba.pm/api/micromamba/{architecture}/1.5.7"
response = requests.get(url, stream=True)
if response.status_code != 200:
raise Exception(
f"Failed to download micromamba: HTTP {response.status_code}"
)
tar_content = bz2.BZ2Decompressor().decompress(response.raw.read())
with tarfile.open(fileobj=io.BytesIO(tar_content), mode="r:") as tar:
tar.extract("bin/micromamba", path=micromamba_dir, set_attrs=False)

os.chmod(micromamba_path, 0o755)
if not os.path.exists(micromamba_path):
raise Exception("Failed to install Micromamba!")

os.environ["PATH"] += os.pathsep + os.path.dirname(micromamba_path)
return micromamba_path

@timer
def download_conda_packages(storage, packages, dest_dir):

def process_conda_package(args):
# Ensure that conda packages go into architecture specific folders.
# The path looks like REPO/CHANNEL/CONDA_SUBDIR/PACKAGE. We trick
# Micromamba into believing that all packages are coming from a local
# channel - the only hurdle is ensuring that packages are organised
# properly.

# TODO: consider RAM disk
dest = os.path.join(conda_pkgs_dir, "/".join(key.split("/")[-2:]))
key, tmpfile, dest_dir = args
dest = os.path.join(dest_dir, "/".join(key.split("/")[-2:]))
os.makedirs(os.path.dirname(dest), exist_ok=True)
shutil.move(tmpfile, dest)

# Create Conda environment.
cmds = [
# TODO: check if mamba or conda are already available on the image
# TODO: micromamba installation can be pawned off to micromamba.py
f"""set -e;
if ! command -v micromamba >/dev/null 2>&1; then
mkdir -p micromamba;
python -c "import requests, bz2, sys; data = requests.get('https://micro.mamba.pm/api/micromamba/{architecture}/1.5.7').content; sys.stdout.buffer.write(bz2.decompress(data))" | tar -xv -C $(pwd)/micromamba bin/micromamba --strip-components 1;
os.makedirs(dest_dir, exist_ok=True)
with storage.load_bytes([package["path"] for package in packages]) as results:
with concurrent.futures.ThreadPoolExecutor() as executor:
executor.map(
process_conda_package,
[(key, tmpfile, dest_dir) for key, tmpfile, _ in results],
)
# for key, tmpfile, _ in results:

# # TODO: consider RAM disk
# dest = os.path.join(dest_dir, "/".join(key.split("/")[-2:]))
# os.makedirs(os.path.dirname(dest), exist_ok=True)
# shutil.move(tmpfile, dest)
return dest_dir

@timer
def download_pypi_packages(storage, packages, dest_dir):

def process_pypi_package(args):
key, tmpfile, dest_dir = args
dest = os.path.join(dest_dir, os.path.basename(key))
shutil.move(tmpfile, dest)

os.makedirs(dest_dir, exist_ok=True)
with storage.load_bytes([package["path"] for package in packages]) as results:
with concurrent.futures.ThreadPoolExecutor() as executor:
executor.map(
process_pypi_package,
[(key, tmpfile, dest_dir) for key, tmpfile, _ in results],
)
# for key, tmpfile, _ in results:
# dest = os.path.join(dest_dir, os.path.basename(key))
# shutil.move(tmpfile, dest)
return dest_dir

@timer
def create_conda_environment(prefix, conda_pkgs_dir):
cmd = f'''set -e;
tmpfile=$(mktemp);
echo "@EXPLICIT" > "$tmpfile";
ls -d {conda_pkgs_dir}/*/* >> "$tmpfile";
export PATH=$PATH:$(pwd)/micromamba;
if ! command -v micromamba >/dev/null 2>&1; then
echo "Failed to install Micromamba!";
exit 1;
fi;
fi""",
# Create a conda environment through Micromamba.
f'''set -e;
tmpfile=$(mktemp);
echo "@EXPLICIT" > "$tmpfile";
ls -d {conda_pkgs_dir}/*/* >> "$tmpfile";
export PATH=$PATH:$(pwd)/micromamba;
export CONDA_PKGS_DIRS=$(pwd)/micromamba/pkgs;
micromamba create --yes --offline --no-deps --safety-checks=disabled --no-extra-safety-checks --prefix {prefix} --file "$tmpfile";
rm "$tmpfile"''',
]

# Download PyPI packages.
if "pypi" in env:
pypi_pkgs_dir = os.path.join(pkgs_dir, "pypi")
with storage.load_bytes(
[package["path"] for package in env["pypi"]]
) as results:
for key, tmpfile, _ in results:
dest = os.path.join(pypi_pkgs_dir, os.path.basename(key))
os.makedirs(os.path.dirname(dest), exist_ok=True)
shutil.move(tmpfile, dest)

# Install PyPI packages.
cmds.extend(
[
f"""set -e;
export PATH=$PATH:$(pwd)/micromamba;
export CONDA_PKGS_DIRS=$(pwd)/micromamba/pkgs;
micromamba run --prefix {prefix} python -m pip --disable-pip-version-check install --root-user-action=ignore --no-compile {pypi_pkgs_dir}/*.whl --no-user"""
]
)
export CONDA_PKGS_DIRS=$(pwd)/micromamba/pkgs;
micromamba create --yes --offline --no-deps --safety-checks=disabled --no-extra-safety-checks --prefix {prefix} --file "$tmpfile";
rm "$tmpfile"'''
run_cmd(cmd)

for cmd in cmds:
result = subprocess.run(
cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
@timer
def install_pypi_packages(prefix, pypi_pkgs_dir):

cmd = f"""set -e;
export PATH=$PATH:$(pwd)/micromamba;
export CONDA_PKGS_DIRS=$(pwd)/micromamba/pkgs;
micromamba run --prefix {prefix} python -m pip --disable-pip-version-check install --root-user-action=ignore --no-compile --no-index --no-cache-dir --no-deps --prefer-binary --find-links={pypi_pkgs_dir} {pypi_pkgs_dir}/*.whl --no-user"""
run_cmd(cmd)

with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
# install micromamba, download conda and pypi packages in parallel
future_install_micromamba = executor.submit(install_micromamba, architecture)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check if download from datastore possible

future_download_conda_packages = executor.submit(
download_conda_packages, storage, env["conda"], conda_pkgs_dir
)
if result.returncode != 0:
print(f"Bootstrap failed while executing: {cmd}")
print("Stdout:", result.stdout.decode())
print("Stderr:", result.stderr.decode())
sys.exit(1)
future_download_pypi_packages = None
if "pypi" in env:
future_download_pypi_packages = executor.submit(
download_pypi_packages, storage, env["pypi"], pypi_pkgs_dir
)
# create conda environment after micromamba is installed and conda packages are downloaded
concurrent.futures.wait(
[future_install_micromamba, future_download_conda_packages]
)
future_create_conda_environment = executor.submit(
create_conda_environment, prefix, conda_pkgs_dir
)
if "pypi" in env:
# install pypi packages after conda environment is created and pypi packages are downloaded
concurrent.futures.wait(
[future_create_conda_environment, future_download_pypi_packages]
)
future_install_pypi_packages = executor.submit(
install_pypi_packages, prefix, pypi_pkgs_dir
)
# wait for pypi packages to be installed
future_install_pypi_packages.result()
else:
# wait for conda environment to be created
future_create_conda_environment.result()
Loading
Loading