diff --git a/metaflow/metaflow_config.py b/metaflow/metaflow_config.py index 016b115d8f..69ce91c81a 100644 --- a/metaflow/metaflow_config.py +++ b/metaflow/metaflow_config.py @@ -165,6 +165,8 @@ SKIP_CARD_DUALWRITE = from_conf("SKIP_CARD_DUALWRITE", False) +RUNTIME_CARD_RENDER_INTERVAL = from_conf("RUNTIME_CARD_RENDER_INTERVAL", 60) + # Azure storage account URL AZURE_STORAGE_BLOB_SERVICE_ENDPOINT = from_conf("AZURE_STORAGE_BLOB_SERVICE_ENDPOINT") diff --git a/metaflow/plugins/__init__.py b/metaflow/plugins/__init__.py index 13c45e0cfe..9f2d4e9289 100644 --- a/metaflow/plugins/__init__.py +++ b/metaflow/plugins/__init__.py @@ -166,6 +166,8 @@ def get_plugin_cli(): TestNonEditableCard, TestPathSpecCard, TestTimeoutCard, + TestRefreshCard, + TestRefreshComponentCard, ) CARDS = [ @@ -182,5 +184,7 @@ def get_plugin_cli(): TestNonEditableCard, BlankCard, DefaultCardJSON, + TestRefreshCard, + TestRefreshComponentCard, ] merge_lists(CARDS, MF_EXTERNAL_CARDS, "type") diff --git a/metaflow/plugins/cards/card_cli.py b/metaflow/plugins/cards/card_cli.py index 2c0c801774..de52546a29 100644 --- a/metaflow/plugins/cards/card_cli.py +++ b/metaflow/plugins/cards/card_cli.py @@ -6,6 +6,7 @@ from metaflow._vendor import click import os import json +import uuid import signal import random from contextlib import contextmanager @@ -19,11 +20,18 @@ CardNotPresentException, TaskNotFoundException, ) +import traceback +from collections import namedtuple from .card_resolver import resolve_paths_from_task, resumed_info id_func = id +CardRenderInfo = namedtuple( + "CardRenderInfo", + ["mode", "is_implemented", "data", "timed_out", "timeout_stack_trace"], +) + def open_in_browser(card_path): url = "file://" + os.path.abspath(card_path) @@ -151,8 +159,6 @@ def timeout(time): try: yield - except TimeoutError: - pass finally: # Unregister the signal so that it won't be triggered # if the timeout is not reached. @@ -375,14 +381,144 @@ def wrapper(*args, **kwargs): return wrapper -def render_card(mf_card, task, timeout_value=None): - rendered_info = None +def update_card(mf_card, mode, task, data, timeout_value=None): + """ + This method will be responsible for creating a card/data-update based on the `mode`. + There are three possible modes taken by this function. + - render : + - This will render the "final" card. + - This mode is passed at task completion. + - Setting this mode will call the `render` method of a MetaflowCard. + - It will result in the creation of an HTML page. + - render_runtime: + - Setting this mode will render a card during task "runtime". + - Setting this mode will call the `render_runtime` method of a MetaflowCard. + - It will result in the creation of an HTML page. + - refresh: + - Setting this mode will refresh the data update for a card. + - We support this mode because rendering a full card can be an expensive operation, but shipping tiny data updates can be cheap. + - Setting this mode will call the `refresh` method of a MetaflowCard. + - It will result in the creation of a JSON object. + + Parameters + ---------- + mf_card : MetaflowCard + MetaflowCard object which will be used to render the card. + mode : str + Mode of rendering the card. + task : Task + Task object which will be passed to render the card. + data : dict + object created and passed down from `current.card._get_latest_data` method. + For more information on this object's schema have a look at `current.card._get_latest_data` method. + timeout_value : int + Timeout value for rendering the card. + + Returns + ------- + CardRenderInfo + - NamedTuple which will contain: + - `mode`: The mode of rendering the card. + - `is_implemented`: whether the function was implemented or not. + - `data` : output from rendering the card (Can be string/dict) + - `timed_out` : whether the function timed out or not. + - `timeout_stack_trace` : stack trace of the function if it timed out. + """ + + def _reload_token(): + if "render_seq" not in data: + return "never" + + if data["render_seq"] == "final": + # final data update should always trigger a card reload to show + # the final card, hence a different token for the final update + return "final" + elif mf_card.RELOAD_POLICY == mf_card.RELOAD_POLICY_ALWAYS: + return "render-seq-%s" % data["render_seq"] + elif mf_card.RELOAD_POLICY == mf_card.RELOAD_POLICY_NEVER: + return "never" + elif mf_card.RELOAD_POLICY == mf_card.RELOAD_POLICY_ONCHANGE: + return mf_card.reload_content_token(task, data) + + def _add_token_html(html): + if html is None: + return None + return html.replace(mf_card.RELOAD_POLICY_TOKEN, _reload_token()) + + def _add_token_json(json_msg): + if json_msg is None: + return None + return {"reload_token": _reload_token(), "data": json_msg} + + def _safe_call_function(func, *args, **kwargs): + """ + returns (data, is_implemented) + """ + try: + return func(*args, **kwargs), True + except NotImplementedError as e: + return None, False + + def _call(): + if mode == "render": + setattr( + mf_card.__class__, + "runtime_data", + property(fget=lambda _, data=data: data), + ) + output = _add_token_html(mf_card.render(task)) + return CardRenderInfo( + mode=mode, + is_implemented=True, + data=output, + timed_out=False, + timeout_stack_trace=None, + ) + + elif mode == "render_runtime": + # Since many cards created by metaflow users may not have implemented a + # `render_time` / `refresh` methods, it can result in an exception and thereby + # creation of error cards (especially for the `render_runtime` method). So instead + # we will catch the NotImplementedError and return None if users have not implemented it. + # If there any any other exception from the user code, it should be bubbled to the top level. + output, is_implemented = _safe_call_function( + mf_card.render_runtime, task, data + ) + return CardRenderInfo( + mode=mode, + is_implemented=is_implemented, + data=_add_token_html(output), + timed_out=False, + timeout_stack_trace=None, + ) + + elif mode == "refresh": + output, is_implemented = _safe_call_function(mf_card.refresh, task, data) + return CardRenderInfo( + mode=mode, + is_implemented=is_implemented, + data=_add_token_json(output), + timed_out=False, + timeout_stack_trace=None, + ) + + render_info = None if timeout_value is None or timeout_value < 0: - rendered_info = mf_card.render(task) + return _call() else: - with timeout(timeout_value): - rendered_info = mf_card.render(task) - return rendered_info + try: + with timeout(timeout_value): + render_info = _call() + except TimeoutError: + stack_trace = traceback.format_exc() + return CardRenderInfo( + mode=mode, + is_implemented=True, + data=None, + timed_out=True, + timeout_stack_trace=stack_trace, + ) + return render_info @card.command(help="create a HTML card") @@ -414,29 +550,64 @@ def render_card(mf_card, task, timeout_value=None): is_flag=True, help="Upon failing to render a card, render a card holding the stack trace", ) +@click.option( + "--id", + default=None, + show_default=True, + type=str, + help="ID of the card", +) @click.option( "--component-file", default=None, show_default=True, type=str, - help="JSON File with Pre-rendered components.(internal)", + help="JSON File with pre-rendered components. (internal)", ) @click.option( - "--id", + "--mode", + default="render", + show_default=True, + type=click.Choice(["render", "render_runtime", "refresh"]), + help="Rendering mode. (internal)", +) +@click.option( + "--data-file", default=None, show_default=True, type=str, - help="ID of the card", + hidden=True, + help="JSON file containing data to be updated. (internal)", +) +@click.option( + "--card-uuid", + default=None, + show_default=True, + type=str, + hidden=True, + help="Card UUID. (internal)", +) +@click.option( + "--delete-input-files", + default=False, + is_flag=True, + show_default=True, + hidden=True, + help="Delete data-file and component-file after reading. (internal)", ) @click.pass_context def create( ctx, pathspec, + mode=None, type=None, options=None, timeout=None, component_file=None, + data_file=None, render_error_card=False, + card_uuid=None, + delete_input_files=None, id=None, ): card_id = id @@ -452,11 +623,26 @@ def create( graph_dict, _ = ctx.obj.graph.output_steps() + if card_uuid is None: + card_uuid = str(uuid.uuid4()).replace("-", "") + # Components are rendered in a Step and added via `current.card.append` are added here. component_arr = [] if component_file is not None: with open(component_file, "r") as f: component_arr = json.load(f) + # Component data used in card runtime is passed in as temporary files which can be deleted after use + if delete_input_files: + os.remove(component_file) + + # Load data to be refreshed for runtime cards + data = {} + if data_file is not None: + with open(data_file, "r") as f: + data = json.load(f) + # data is passed in as temporary files which can be deleted after use + if delete_input_files: + os.remove(data_file) task = Task(full_pathspec) from metaflow.plugins import CARDS @@ -498,23 +684,88 @@ def create( else: raise IncorrectCardArgsException(type, options) + rendered_content = None if mf_card: try: - rendered_info = render_card(mf_card, task, timeout_value=timeout) + rendered_info = update_card( + mf_card, mode, task, data, timeout_value=timeout + ) + rendered_content = rendered_info.data except: + rendered_info = CardRenderInfo( + mode=mode, + is_implemented=True, + data=None, + timed_out=False, + timeout_stack_trace=None, + ) if render_error_card: error_stack_trace = str(UnrenderableCardException(type, options)) else: raise UnrenderableCardException(type, options) - # - if error_stack_trace is not None: - rendered_info = error_card().render(task, stack_trace=error_stack_trace) - - if rendered_info is None and render_error_card: - rendered_info = error_card().render( - task, stack_trace="No information rendered From card of type %s" % type + # In the entire card rendering process, there are a few cases we want to handle: + # - [mode == "render"] + # 1. Card is rendered successfully (We store it in the datastore as a HTML file) + # 2. Card is not rendered successfully and we have --save-error-card flag set to True + # (We store it in the datastore as a HTML file with stack trace) + # 3. Card render timed-out and we have --save-error-card flag set to True + # (We store it in the datastore as a HTML file with stack trace) + # 4. `render` returns nothing and we have --save-error-card flag set to True. + # (We store it in the datastore as a HTML file with some message saying you returned nothing) + # - [mode == "render_runtime"] + # 1. Card is rendered successfully (We store it in the datastore as a HTML file) + # 2. `render_runtime` is not implemented but gets called and we have --save-error-card flag set to True. + # (We store it in the datastore as a HTML file with some message saying the card should not be a runtime card if this method is not Implemented) + # 3. `render_runtime` is implemented and raises an exception and we have --save-error-card flag set to True. + # (We store it in the datastore as a HTML file with stack trace) + # 4. `render_runtime` is implemented but returns nothing and we have --save-error-card flag set to True. + # (We store it in the datastore as a HTML file with some message saying you returned nothing) + # 5. `render_runtime` is implemented but times out and we have --save-error-card flag set to True. + # (We store it in the datastore as a HTML file with stack trace) + # - [mode == "refresh"] + # 1. Data update is created successfully (We store it in the datastore as a JSON file) + # 2. `refresh` is not implemented. (We do nothing. Don't store anything.) + # 3. `refresh` is implemented but it raises an exception. (We do nothing. Don't store anything.) + # 4. `refresh` is implemented but it times out. (We do nothing. Don't store anything.) + + if error_stack_trace is not None and mode != "refresh": + rendered_content = error_card().render(task, stack_trace=error_stack_trace) + + if ( + rendered_info.is_implemented + and rendered_info.timed_out + and mode != "refresh" + and render_error_card + ): + timeout_stack_trace = ( + "\nCard rendering timed out after %s seconds. " + "To increase the timeout duration for card rendering, please set the `timeout` parameter in the @card decorator. " + "\nStack Trace : \n%s" + ) % (timeout, rendered_info.timeout_stack_trace) + rendered_content = error_card().render( + task, + stack_trace=timeout_stack_trace, + ) + elif ( + rendered_info.is_implemented + and rendered_info.data is None + and render_error_card + and mode != "refresh" + ): + rendered_content = error_card().render( + task, stack_trace="No information rendered from card of type %s" % type ) + elif ( + not rendered_info.is_implemented + and render_error_card + and mode == "render_runtime" + ): + message = ( + "Card of type %s is a runtime time card with no `render_runtime` implemented. " + "Please implement `render_runtime` method to allow rendering this card at runtime." + ) % type + rendered_content = error_card().render(task, stack_trace=message) # todo : should we save native type for error card or error type ? if type is not None and re.match(CARD_ID_PATTERN, type) is not None: @@ -531,13 +782,21 @@ def create( ) card_id = None - if rendered_info is not None: - card_info = card_datastore.save_card(save_type, rendered_info, card_id=card_id) - ctx.obj.echo( - "Card created with type: %s and hash: %s" - % (card_info.type, card_info.hash[:NUM_SHORT_HASH_CHARS]), - fg="green", - ) + if rendered_content is not None: + if mode == "refresh": + card_datastore.save_data( + card_uuid, save_type, rendered_content, card_id=card_id + ) + ctx.obj.echo("Data updated", fg="green") + else: + card_info = card_datastore.save_card( + card_uuid, save_type, rendered_content, card_id=card_id + ) + ctx.obj.echo( + "Card created with type: %s and hash: %s" + % (card_info.type, card_info.hash[:NUM_SHORT_HASH_CHARS]), + fg="green", + ) @card.command() @@ -655,7 +914,6 @@ def list( as_json=False, file=None, ): - card_id = id if pathspec is None: list_many_cards( diff --git a/metaflow/plugins/cards/card_client.py b/metaflow/plugins/cards/card_client.py index 07800a347f..02a6b7beb9 100644 --- a/metaflow/plugins/cards/card_client.py +++ b/metaflow/plugins/cards/card_client.py @@ -2,10 +2,10 @@ from metaflow.datastore import FlowDataStore from metaflow.metaflow_config import CARD_SUFFIX from .card_resolver import resolve_paths_from_task, resumed_info -from .card_datastore import CardDatastore +from .card_datastore import CardDatastore, CardNameSuffix from .exception import ( UnresolvableDatastoreException, - IncorrectArguementException, + IncorrectArgumentException, IncorrectPathspecException, ) import os @@ -57,6 +57,15 @@ def __init__( # Tempfile to open stuff in browser self._temp_file = None + def get_data(self) -> Optional[dict]: + # currently an internal method to retrieve a card's data. + data_paths = self._card_ds.extract_data_paths( + card_type=self.type, card_hash=self.hash, card_id=self._card_id + ) + if len(data_paths) == 0: + return None + return self._card_ds.get_card_data(data_paths[0]) + def get(self) -> str: """ Retrieves the HTML contents of the card from the @@ -150,12 +159,12 @@ class CardContainer: ``` """ - def __init__(self, card_paths, card_ds, from_resumed=False, origin_pathspec=None): + def __init__(self, card_paths, card_ds, origin_pathspec=None): self._card_paths = card_paths self._card_ds = card_ds self._current = 0 self._high = len(card_paths) - self.from_resumed = from_resumed + self.from_resumed = origin_pathspec is not None self.origin_pathspec = origin_pathspec def __len__(self): @@ -172,7 +181,7 @@ def _get_card(self, index): if index >= self._high: raise IndexError path = self._card_paths[index] - card_info = self._card_ds.card_info_from_path(path) + card_info = self._card_ds.info_from_path(path, suffix=CardNameSuffix.CARD) # todo : find card creation date and put it in client. return Card( self._card_ds, @@ -250,8 +259,9 @@ def get_cards( task = Task(task_str) elif not isinstance(task, Task): # Exception that the task argument should be of form `Task` or `str` - raise IncorrectArguementException(_TYPE(task)) + raise IncorrectArgumentException(_TYPE(task)) + origin_taskpathspec = None if follow_resumed: origin_taskpathspec = resumed_info(task) if origin_taskpathspec: @@ -263,7 +273,6 @@ def get_cards( return CardContainer( card_paths, card_ds, - from_resumed=origin_taskpathspec is not None, origin_pathspec=origin_taskpathspec, ) diff --git a/metaflow/plugins/cards/card_creator.py b/metaflow/plugins/cards/card_creator.py new file mode 100644 index 0000000000..04262dd97f --- /dev/null +++ b/metaflow/plugins/cards/card_creator.py @@ -0,0 +1,224 @@ +import time +import subprocess +import tempfile +import json +import sys +import os +from metaflow import current + +ASYNC_TIMEOUT = 30 + + +class CardProcessManager: + """ + This class is responsible for managing the card creation processes. + + """ + + async_card_processes = { + # "carduuid": { + # "proc": subprocess.Popen, + # "started": time.time() + # } + } + + @classmethod + def _register_card_process(cls, carduuid, proc): + cls.async_card_processes[carduuid] = { + "proc": proc, + "started": time.time(), + } + + @classmethod + def _get_card_process(cls, carduuid): + proc_dict = cls.async_card_processes.get(carduuid, None) + if proc_dict is not None: + return proc_dict["proc"], proc_dict["started"] + return None, None + + @classmethod + def _remove_card_process(cls, carduuid): + if carduuid in cls.async_card_processes: + cls.async_card_processes[carduuid]["proc"].kill() + del cls.async_card_processes[carduuid] + + +class CardCreator: + def __init__(self, top_level_options): + self._top_level_options = top_level_options + + def create( + self, + card_uuid=None, + user_set_card_id=None, + runtime_card=False, + decorator_attributes=None, + card_options=None, + logger=None, + mode="render", + final=False, + ): + # warning_message("calling proc for uuid %s" % self._card_uuid, self._logger) + if mode != "render" and not runtime_card: + # silently ignore runtime updates for cards that don't support them + return + elif mode == "refresh": + # don't serialize components, which can be a somewhat expensive operation, + # if we are just updating data + component_strings = [] + else: + component_strings = current.card._serialize_components(card_uuid) + + data = current.card._get_latest_data(card_uuid, final=final, mode=mode) + runspec = "/".join([current.run_id, current.step_name, current.task_id]) + self._run_cards_subprocess( + card_uuid, + user_set_card_id, + mode, + runspec, + decorator_attributes, + card_options, + component_strings, + logger, + data, + final=final, + ) + + def _run_cards_subprocess( + self, + card_uuid, + user_set_card_id, + mode, + runspec, + decorator_attributes, + card_options, + component_strings, + logger, + data=None, + final=False, + ): + components_file = data_file = None + wait = final + + if len(component_strings) > 0: + # note that we can't delete temporary files here when calling the subprocess + # async due to a race condition. The subprocess must delete them + components_file = tempfile.NamedTemporaryFile( + "w", suffix=".json", delete=False + ) + json.dump(component_strings, components_file) + components_file.seek(0) + if data is not None: + data_file = tempfile.NamedTemporaryFile("w", suffix=".json", delete=False) + json.dump(data, data_file) + data_file.seek(0) + + executable = sys.executable + cmd = [ + executable, + sys.argv[0], + ] + cmd += self._top_level_options + [ + "card", + "create", + runspec, + "--delete-input-files", + "--card-uuid", + card_uuid, + "--mode", + mode, + "--type", + decorator_attributes["type"], + # Add the options relating to card arguments. + # todo : add scope as a CLI arg for the create method. + ] + if card_options is not None and len(card_options) > 0: + cmd += ["--options", json.dumps(card_options)] + # set the id argument. + + if decorator_attributes["timeout"] is not None: + cmd += ["--timeout", str(decorator_attributes["timeout"])] + + if user_set_card_id is not None: + cmd += ["--id", str(user_set_card_id)] + + if decorator_attributes["save_errors"]: + cmd += ["--render-error-card"] + + if components_file is not None: + cmd += ["--component-file", components_file.name] + + if data_file is not None: + cmd += ["--data-file", data_file.name] + + response, fail = self._run_command( + cmd, + card_uuid, + os.environ, + timeout=decorator_attributes["timeout"], + wait=wait, + ) + if fail: + resp = "" if response is None else response.decode("utf-8") + logger( + "Card render failed with error : \n\n %s" % resp, + timestamp=False, + bad=True, + ) + + def _wait_for_async_processes_to_finish(self, card_uuid, async_timeout): + _async_proc, _async_started = CardProcessManager._get_card_process(card_uuid) + while _async_proc is not None and _async_proc.poll() is None: + if time.time() - _async_started > async_timeout: + # This means the process has crossed the timeout and we need to kill it. + CardProcessManager._remove_card_process(card_uuid) + break + + def _run_command(self, cmd, card_uuid, env, wait=True, timeout=None): + fail = False + timeout_args = {} + async_timeout = ASYNC_TIMEOUT + if timeout is not None: + async_timeout = int(timeout) + 10 + timeout_args = dict(timeout=int(timeout) + 10) + + if wait: + self._wait_for_async_processes_to_finish(card_uuid, async_timeout) + try: + rep = subprocess.check_output( + cmd, env=env, stderr=subprocess.STDOUT, **timeout_args + ) + except subprocess.CalledProcessError as e: + rep = e.output + fail = True + except subprocess.TimeoutExpired as e: + rep = e.output + fail = True + return rep, fail + else: + _async_proc, _async_started = CardProcessManager._get_card_process( + card_uuid + ) + if _async_proc and _async_proc.poll() is None: + if time.time() - _async_started > async_timeout: + CardProcessManager._remove_card_process(card_uuid) + # Since we have removed the card process, we are free to run a new one + # This will also ensure that when a old process is removed a new one is replaced. + return self._run_command( + cmd, card_uuid, env, wait=wait, timeout=timeout + ) + else: + # silently refuse to run an async process if a previous one is still running + # and timeout hasn't been reached + return "".encode(), False + else: + CardProcessManager._register_card_process( + card_uuid, + subprocess.Popen( + cmd, + env=env, + stderr=subprocess.DEVNULL, + stdout=subprocess.DEVNULL, + ), + ) + return "".encode(), False diff --git a/metaflow/plugins/cards/card_datastore.py b/metaflow/plugins/cards/card_datastore.py index 5993159287..9e2edbd666 100644 --- a/metaflow/plugins/cards/card_datastore.py +++ b/metaflow/plugins/cards/card_datastore.py @@ -3,9 +3,9 @@ """ from collections import namedtuple -from hashlib import sha1 from io import BytesIO import os +import json import shutil from metaflow.plugins.datastores.local_storage import LocalStorage @@ -28,6 +28,16 @@ CardInfo = namedtuple("CardInfo", ["type", "hash", "id", "filename"]) +class CardNameSuffix: + DATA = "data.json" + CARD = "html" + + +class CardPathSuffix: + DATA = "runtime" + CARD = "cards" + + def path_spec_resolver(pathspec): splits = pathspec.split("/") splits.extend([None] * (4 - len(splits))) @@ -85,18 +95,22 @@ def __init__(self, flow_datastore, pathspec=None): self._run_id = run_id self._step_name = step_name self._pathspec = pathspec - self._temp_card_save_path = self._get_write_path(base_pth=TEMP_DIR_NAME) + self._temp_card_save_path = self._get_card_write_path(base_pth=TEMP_DIR_NAME) @classmethod - def get_card_location(cls, base_path, card_name, card_html, card_id=None): - chash = sha1(bytes(card_html, "utf-8")).hexdigest() + def get_card_location( + cls, base_path, card_name, uuid, card_id=None, suffix=CardNameSuffix.CARD + ): + chash = uuid if card_id is None: - card_file_name = "%s-%s.html" % (card_name, chash) + card_file_name = "%s-%s.%s" % (card_name, chash, suffix) else: - card_file_name = "%s-%s-%s.html" % (card_name, card_id, chash) + card_file_name = "%s-%s-%s.%s" % (card_name, card_id, chash, suffix) return os.path.join(base_path, card_file_name) - def _make_path(self, base_pth, pathspec=None, with_steps=False): + def _make_path( + self, base_pth, pathspec=None, with_steps=False, suffix=CardPathSuffix.CARD + ): sysroot = base_pth if pathspec is not None: # since most cards are at a task level there will always be 4 non-none values returned @@ -121,7 +135,7 @@ def _make_path(self, base_pth, pathspec=None, with_steps=False): step_name, "tasks", task_id, - "cards", + suffix, ] else: pth_arr = [ @@ -131,20 +145,49 @@ def _make_path(self, base_pth, pathspec=None, with_steps=False): run_id, "tasks", task_id, - "cards", + suffix, ] if sysroot == "" or sysroot is None: pth_arr.pop(0) return os.path.join(*pth_arr) - def _get_write_path(self, base_pth=""): - return self._make_path(base_pth, pathspec=self._pathspec, with_steps=True) + def _get_data_read_path(self, base_pth=""): + return self._make_path( + base_pth=base_pth, + pathspec=self._pathspec, + with_steps=True, + suffix=CardPathSuffix.DATA, + ) + + def _get_data_write_path(self, base_pth=""): + return self._make_path( + base_pth=base_pth, + pathspec=self._pathspec, + with_steps=True, + suffix=CardPathSuffix.DATA, + ) + + def _get_card_write_path( + self, + base_pth="", + ): + return self._make_path( + base_pth, + pathspec=self._pathspec, + with_steps=True, + suffix=CardPathSuffix.CARD, + ) - def _get_read_path(self, base_pth="", with_steps=False): - return self._make_path(base_pth, pathspec=self._pathspec, with_steps=with_steps) + def _get_card_read_path(self, base_pth="", with_steps=False): + return self._make_path( + base_pth, + pathspec=self._pathspec, + with_steps=with_steps, + suffix=CardPathSuffix.CARD, + ) @staticmethod - def card_info_from_path(path): + def info_from_path(path, suffix=CardNameSuffix.CARD): """ Args: path (str): The path to the card @@ -160,8 +203,8 @@ def card_info_from_path(path): if len(file_split) not in [2, 3]: raise Exception( - "Invalid card file name %s. Card file names should be of form TYPE-HASH.html or TYPE-ID-HASH.html" - % card_file_name + "Invalid file name %s. Card/Data file names should be of form TYPE-HASH.%s or TYPE-ID-HASH.%s" + % (card_file_name, suffix, suffix) ) card_type, card_hash, card_id = None, None, None @@ -170,10 +213,23 @@ def card_info_from_path(path): else: card_type, card_id, card_hash = file_split - card_hash = card_hash.split(".html")[0] + card_hash = card_hash.split("." + suffix)[0] return CardInfo(card_type, card_hash, card_id, card_file_name) - def save_card(self, card_type, card_html, card_id=None, overwrite=True): + def save_data(self, uuid, card_type, json_data, card_id=None): + card_file_name = card_type + loc = self.get_card_location( + self._get_data_write_path(), + card_file_name, + uuid, + card_id=card_id, + suffix=CardNameSuffix.DATA, + ) + self._backend.save_bytes( + [(loc, BytesIO(json.dumps(json_data).encode("utf-8")))], overwrite=True + ) + + def save_card(self, uuid, card_type, card_html, card_id=None, overwrite=True): card_file_name = card_type # TEMPORARY_WORKAROUND: FIXME (LATER) : Fix the duplication of below block in a few months. # Check file blame to understand the age of this temporary workaround. @@ -193,7 +249,11 @@ def save_card(self, card_type, card_html, card_id=None, overwrite=True): # It will also easily end up breaking the metaflow-ui (which maybe using a client from an older version). # Hence, we are writing cards to both paths so that we can introduce breaking changes later in the future. card_path_with_steps = self.get_card_location( - self._get_write_path(), card_file_name, card_html, card_id=card_id + self._get_card_write_path(), + card_file_name, + uuid, + card_id=card_id, + suffix=CardNameSuffix.CARD, ) if SKIP_CARD_DUALWRITE: self._backend.save_bytes( @@ -202,28 +262,29 @@ def save_card(self, card_type, card_html, card_id=None, overwrite=True): ) else: card_path_without_steps = self.get_card_location( - self._get_read_path(with_steps=False), + self._get_card_read_path(with_steps=False), card_file_name, - card_html, + uuid, card_id=card_id, + suffix=CardNameSuffix.CARD, ) for cp in [card_path_with_steps, card_path_without_steps]: self._backend.save_bytes( [(cp, BytesIO(bytes(card_html, "utf-8")))], overwrite=overwrite ) - return self.card_info_from_path(card_path_with_steps) + return self.info_from_path(card_path_with_steps, suffix=CardNameSuffix.CARD) def _list_card_paths(self, card_type=None, card_hash=None, card_id=None): # Check for new cards first card_paths = [] card_paths_with_steps = self._backend.list_content( - [self._get_read_path(with_steps=True)] + [self._get_card_read_path(with_steps=True)] ) if len(card_paths_with_steps) == 0: card_paths_without_steps = self._backend.list_content( - [self._get_read_path(with_steps=False)] + [self._get_card_read_path(with_steps=False)] ) if len(card_paths_without_steps) == 0: # If there are no files found on the Path then raise an error of @@ -240,7 +301,7 @@ def _list_card_paths(self, card_type=None, card_hash=None, card_id=None): cards_found = [] for task_card_path in card_paths: card_path = task_card_path.path - card_info = self.card_info_from_path(card_path) + card_info = self.info_from_path(card_path, suffix=CardNameSuffix.CARD) if card_type is not None and card_info.type != card_type: continue elif card_hash is not None: @@ -254,11 +315,32 @@ def _list_card_paths(self, card_type=None, card_hash=None, card_id=None): return cards_found + def _list_card_data(self, card_type=None, card_hash=None, card_id=None): + card_data_paths = self._backend.list_content([self._get_data_read_path()]) + data_found = [] + + for data_path in card_data_paths: + _pth = data_path.path + card_info = self.info_from_path(_pth, suffix=CardNameSuffix.DATA) + if card_type is not None and card_info.type != card_type: + continue + elif card_hash is not None: + if not card_info.hash.startswith(card_hash): + continue + elif card_id is not None and card_info.id != card_id: + continue + if data_path.is_file: + data_found.append(_pth) + + return data_found + def create_full_path(self, card_path): return os.path.join(self._backend.datastore_root, card_path) def get_card_names(self, card_paths): - return [self.card_info_from_path(path) for path in card_paths] + return [ + self.info_from_path(path, suffix=CardNameSuffix.CARD) for path in card_paths + ] def get_card_html(self, path): with self._backend.load_bytes([path]) as get_results: @@ -267,6 +349,13 @@ def get_card_html(self, path): with open(path, "r") as f: return f.read() + def get_card_data(self, path): + with self._backend.load_bytes([path]) as get_results: + for _, path, _ in get_results: + if path is not None: + with open(path, "r") as f: + return json.loads(f.read()) + def cache_locally(self, path, save_path=None): """ Saves the data present in the `path` the `metaflow_card_cache` directory or to the `save_path`. @@ -292,6 +381,15 @@ def cache_locally(self, path, save_path=None): shutil.copy(path, main_path) return main_path + def extract_data_paths(self, card_type=None, card_hash=None, card_id=None): + return self._list_card_data( + # card_hash is the unique identifier to the card. + # Its no longer the actual hash! + card_type=card_type, + card_hash=card_hash, + card_id=card_id, + ) + def extract_card_paths(self, card_type=None, card_hash=None, card_id=None): return self._list_card_paths( card_type=card_type, card_hash=card_hash, card_id=card_id diff --git a/metaflow/plugins/cards/card_decorator.py b/metaflow/plugins/cards/card_decorator.py index efa13a1ec9..feba3722f6 100644 --- a/metaflow/plugins/cards/card_decorator.py +++ b/metaflow/plugins/cards/card_decorator.py @@ -1,15 +1,8 @@ -import subprocess -import os -import tempfile -import sys -import json - -from typing import Dict, Any - -from metaflow.decorators import StepDecorator, flow_decorators +from metaflow.decorators import StepDecorator from metaflow.current import current from metaflow.util import to_unicode from .component_serializer import CardComponentCollector, get_card_class +from .card_creator import CardCreator # from metaflow import get_metadata @@ -17,6 +10,8 @@ from .exception import CARD_ID_PATTERN, TYPE_CHECK_REGEX +ASYNC_TIMEOUT = 30 + def warning_message(message, logger=None, ts=False): msg = "[@card WARNING] %s" % message @@ -51,6 +46,7 @@ class CardDecorator(StepDecorator): "id": None, "save_errors": True, "customize": False, + "refresh_interval": 5, } allow_multiple = True @@ -60,6 +56,8 @@ class CardDecorator(StepDecorator): _called_once = {} + card_creator = None + def __init__(self, *args, **kwargs): super(CardDecorator, self).__init__(*args, **kwargs) self._task_datastore = None @@ -70,6 +68,10 @@ def __init__(self, *args, **kwargs): self._card_uuid = None self._user_set_card_id = None + @classmethod + def _set_card_creator(cls, card_creator): + cls.card_creator = card_creator + def _is_event_registered(self, evt_name): return evt_name in self._called_once @@ -89,7 +91,6 @@ def _increment_step_counter(cls): def step_init( self, flow, graph, step_name, decorators, environment, flow_datastore, logger ): - self._flow_datastore = flow_datastore self._environment = environment self._logger = logger @@ -126,11 +127,18 @@ def task_pre_step( ubf_context, inputs, ): + self._task_datastore = task_datastore + self._metadata = metadata + card_type = self.attributes["type"] card_class = get_card_class(card_type) + + self._is_runtime_card = False if card_class is not None: # Card type was not found if card_class.ALLOW_USER_COMPONENTS: self._is_editable = True + self._is_runtime_card = card_class.RUNTIME_UPDATABLE + # We have a step counter to ensure that on calling the final card decorator's `task_pre_step` # we call a `finalize` function in the `CardComponentCollector`. # This can help ensure the behaviour of the `current.card` object is according to specification. @@ -155,7 +163,11 @@ def task_pre_step( # we need to ensure that `current.card` has `CardComponentCollector` instantiated only once. if not self._is_event_registered("pre-step"): self._register_event("pre-step") - current._update_env({"card": CardComponentCollector(self._logger)}) + self._set_card_creator(CardCreator(self._create_top_level_args())) + + current._update_env( + {"card": CardComponentCollector(self._logger, self.card_creator)} + ) # this line happens because of decospecs parsing. customize = False @@ -165,8 +177,12 @@ def task_pre_step( card_metadata = current.card._add_card( self.attributes["type"], self._user_set_card_id, - self._is_editable, - customize, + self.attributes, + self.card_options, + editable=self._is_editable, + customize=customize, + runtime_card=self._is_runtime_card, + refresh_interval=self.attributes["refresh_interval"], ) self._card_uuid = card_metadata["uuid"] @@ -176,17 +192,20 @@ def task_pre_step( if self.step_counter == self.total_decos_on_step[step_name]: current.card._finalize() - self._task_datastore = task_datastore - self._metadata = metadata - def task_finished( self, step_name, flow, graph, is_task_ok, retry_count, max_user_code_retries ): - if not is_task_ok: - return - component_strings = current.card._serialize_components(self._card_uuid) - runspec = "/".join([current.run_id, current.step_name, current.task_id]) - self._run_cards_subprocess(runspec, component_strings) + create_options = dict( + card_uuid=self._card_uuid, + user_set_card_id=self._user_set_card_id, + runtime_card=self._is_runtime_card, + decorator_attributes=self.attributes, + card_options=self.card_options, + logger=self._logger, + ) + if is_task_ok: + self.card_creator.create(mode="render", final=True, **create_options) + self.card_creator.create(mode="refresh", final=True, **create_options) @staticmethod def _options(mapping): @@ -200,7 +219,6 @@ def _options(mapping): yield to_unicode(value) def _create_top_level_args(self): - top_level_options = { "quiet": True, "metadata": self._metadata.TYPE, @@ -214,67 +232,3 @@ def _create_top_level_args(self): # the context of the main process } return list(self._options(top_level_options)) - - def _run_cards_subprocess(self, runspec, component_strings): - temp_file = None - if len(component_strings) > 0: - temp_file = tempfile.NamedTemporaryFile("w", suffix=".json") - json.dump(component_strings, temp_file) - temp_file.seek(0) - executable = sys.executable - cmd = [ - executable, - sys.argv[0], - ] - cmd += self._create_top_level_args() + [ - "card", - "create", - runspec, - "--type", - self.attributes["type"], - # Add the options relating to card arguments. - # todo : add scope as a CLI arg for the create method. - ] - if self.card_options is not None and len(self.card_options) > 0: - cmd += ["--options", json.dumps(self.card_options)] - # set the id argument. - - if self.attributes["timeout"] is not None: - cmd += ["--timeout", str(self.attributes["timeout"])] - - if self._user_set_card_id is not None: - cmd += ["--id", str(self._user_set_card_id)] - - if self.attributes["save_errors"]: - cmd += ["--render-error-card"] - - if temp_file is not None: - cmd += ["--component-file", temp_file.name] - - response, fail = self._run_command( - cmd, os.environ, timeout=self.attributes["timeout"] - ) - if fail: - resp = "" if response is None else response.decode("utf-8") - self._logger( - "Card render failed with error : \n\n %s" % resp, - timestamp=False, - bad=True, - ) - - def _run_command(self, cmd, env, timeout=None): - fail = False - timeout_args = {} - if timeout is not None: - timeout_args = dict(timeout=int(timeout) + 10) - try: - rep = subprocess.check_output( - cmd, env=env, stderr=subprocess.STDOUT, **timeout_args - ) - except subprocess.CalledProcessError as e: - rep = e.output - fail = True - except subprocess.TimeoutExpired as e: - rep = e.output - fail = True - return rep, fail diff --git a/metaflow/plugins/cards/card_modules/card.py b/metaflow/plugins/cards/card_modules/card.py index 35a639299d..52c4c0dd8b 100644 --- a/metaflow/plugins/cards/card_modules/card.py +++ b/metaflow/plugins/cards/card_modules/card.py @@ -32,12 +32,40 @@ class MetaflowCard(object): JSON-encodable dictionary containing user-definable options for the class. """ + # RELOAD_POLICY determines whether UIs should + # reload intermediate cards produced by render_runtime + # or whether they can just rely on data updates + + # the UI may keep using the same card + # until the final card is produced + RELOAD_POLICY_NEVER = "never" + + # the UI should reload card every time + # render_runtime() has produced a new card + RELOAD_POLICY_ALWAYS = "always" + + # derive reload token from data and component + # content - force reload only when the content + # changes. The actual policy is card-specific, + # defined by the method reload_content_token() + RELOAD_POLICY_ONCHANGE = "onchange" + + # this token will get replaced in the html with a unique + # string that is used to ensure that data updates and the + # card content matches + RELOAD_POLICY_TOKEN = "[METAFLOW_RELOAD_TOKEN]" + type = None ALLOW_USER_COMPONENTS = False + RUNTIME_UPDATABLE = False + RELOAD_POLICY = RELOAD_POLICY_NEVER scope = "task" # can be task | run + # FIXME document runtime_data + runtime_data = None + def __init__(self, options={}, components=[], graph=None): pass @@ -68,8 +96,45 @@ def render(self, task) -> str: """ return NotImplementedError() + # FIXME document + def render_runtime(self, task, data): + raise NotImplementedError() + + # FIXME document + def refresh(self, task, data): + raise NotImplementedError() + + # FIXME document + def reload_content_token(self, task, data): + return "content-token" + class MetaflowCardComponent(object): + + # Setting REALTIME_UPDATABLE as True will allow metaflow to update the card + # during Task runtime. + REALTIME_UPDATABLE = False + + _component_id = None + + _logger = None + + @property + def component_id(self): + return self._component_id + + @component_id.setter + def component_id(self, value): + if not isinstance(value, str): + raise TypeError("Component ID must be a string") + self._component_id = value + + def update(self, *args, **kwargs): + """ + #FIXME document + """ + raise NotImplementedError() + def render(self): """ `render` returns a string or dictionary. This class can be called on the client side to dynamically add components to the `MetaflowCard` diff --git a/metaflow/plugins/cards/card_modules/components.py b/metaflow/plugins/cards/card_modules/components.py index 53ec9c417c..3ffcf0f533 100644 --- a/metaflow/plugins/cards/card_modules/components.py +++ b/metaflow/plugins/cards/card_modules/components.py @@ -11,10 +11,53 @@ from .card import MetaflowCardComponent from .convert_to_native_type import TaskToDict, _full_classname from .renderer_tools import render_safely +import uuid + + +def create_component_id(component): + uuid_bit = "".join(uuid.uuid4().hex.split("-"))[:6] + return type(component).__name__.lower() + "_" + uuid_bit + + +def with_default_component_id(func): + def ret_func(self, *args, **kwargs): + if self.component_id is None: + self.component_id = create_component_id(self) + return func(self, *args, **kwargs) + + return ret_func + + +def _warning_with_component(component, msg): + if component._logger is None: + return None + if component._warned_once: + return None + log_msg = "[@card-component WARNING] %s" % msg + component._logger(log_msg, timestamp=False, bad=True) + component._warned_once = True class UserComponent(MetaflowCardComponent): - pass + + _warned_once = False + + def update(self, *args, **kwargs): + cls_name = self.__class__.__name__ + msg = ( + "MetaflowCardComponent doesn't have an `update` method implemented " + "and is not compatible with realtime updates." + ) % cls_name + _warning_with_component(self, msg) + + +class StubComponent(UserComponent): + def __init__(self, component_id): + self._non_existing_comp_id = component_id + + def update(self, *args, **kwargs): + msg = "Component with id %s doesn't exist. No updates will be made at anytime during runtime." + _warning_with_component(self, msg % self._non_existing_comp_id) class Artifact(UserComponent): diff --git a/metaflow/plugins/cards/card_modules/test_cards.py b/metaflow/plugins/cards/card_modules/test_cards.py index b756fb37db..9a6e2ff7dc 100644 --- a/metaflow/plugins/cards/card_modules/test_cards.py +++ b/metaflow/plugins/cards/card_modules/test_cards.py @@ -1,13 +1,20 @@ +import json from .card import MetaflowCard, MetaflowCardComponent +from .renderer_tools import render_safely class TestStringComponent(MetaflowCardComponent): + REALTIME_UPDATABLE = True + def __init__(self, text): self._text = text def render(self): return str(self._text) + def update(self, text): + self._text = text + class TestPathSpecCard(MetaflowCard): type = "test_pathspec_card" @@ -98,3 +105,110 @@ def render(self, task): time.sleep(self._timeout) return "%s" % task.pathspec + + +REFRESHABLE_HTML_TEMPLATE = """ + + +

[PATHSPEC]

+

[REPLACE_CONTENT_HERE]

+ +""" + + +class TestJSONComponent(MetaflowCardComponent): + + REALTIME_UPDATABLE = True + + def __init__(self, data): + self._data = data + + @render_safely + def render(self): + return self._data + + def update(self, data): + self._data = data + + +class TestRefreshCard(MetaflowCard): + + """ + This card takes no components and helps test the `current.card.refresh(data)` interface. + """ + + HTML_TEMPLATE = REFRESHABLE_HTML_TEMPLATE + + RUNTIME_UPDATABLE = True + + ALLOW_USER_COMPONENTS = True + + # Not implementing Reload Policy here since the reload Policy is set to always + RELOAD_POLICY = MetaflowCard.RELOAD_POLICY_ALWAYS + + type = "test_refresh_card" + + def render(self, task, data) -> str: + return self.HTML_TEMPLATE.replace( + "[REPLACE_CONTENT_HERE]", json.dumps(data["user"]) + ).replace("[PATHSPEC]", task.pathspec) + + def render_runtime(self, task, data): + return self.render(task, data) + + def refresh(self, task, data): + return data + + +import hashlib + + +def _component_values_to_hash(components): + comma_str = ",".join(["".join(x) for v in components.values() for x in v]) + return hashlib.sha256(comma_str.encode("utf-8")).hexdigest() + + +class TestRefreshComponentCard(MetaflowCard): + + """ + This card takes components and helps test the `current.card.components["A"].update()` + interface + """ + + HTML_TEMPLATE = REFRESHABLE_HTML_TEMPLATE + + RUNTIME_UPDATABLE = True + + ALLOW_USER_COMPONENTS = True + + # Not implementing Reload Policy here since the reload Policy is set to always + RELOAD_POLICY = MetaflowCard.RELOAD_POLICY_ONCHANGE + + type = "test_component_refresh_card" + + def __init__(self, options={}, components=[], graph=None): + self._components = components + + def render(self, task, data) -> str: + # Calling `render`/`render_runtime` wont require the `data` object + return self.HTML_TEMPLATE.replace( + "[REPLACE_CONTENT_HERE]", json.dumps(self._components) + ).replace("[PATHSPEC]", task.pathspec) + + def render_runtime(self, task, data): + return self.render(task, data) + + def refresh(self, task, data): + # Govers the information passed in the data update + return data["components"] + + def reload_content_token(self, task, data): + if task.finished: + return "final" + return "runtime-%s" % _component_values_to_hash(data["components"]) diff --git a/metaflow/plugins/cards/card_resolver.py b/metaflow/plugins/cards/card_resolver.py index d8366990d2..e717005ff2 100644 --- a/metaflow/plugins/cards/card_resolver.py +++ b/metaflow/plugins/cards/card_resolver.py @@ -1,5 +1,3 @@ -from collections import namedtuple - from .card_datastore import CardDatastore diff --git a/metaflow/plugins/cards/component_serializer.py b/metaflow/plugins/cards/component_serializer.py index 9451cd6740..5d79c8f710 100644 --- a/metaflow/plugins/cards/component_serializer.py +++ b/metaflow/plugins/cards/component_serializer.py @@ -1,8 +1,17 @@ from .card_modules import MetaflowCardComponent from .card_modules.basic import ErrorComponent, SectionComponent -from .card_modules.components import UserComponent +from .card_modules.components import ( + UserComponent, + create_component_id, + StubComponent, +) +from .exception import ComponentOverwriteNotSupportedException +from metaflow.metaflow_config import RUNTIME_CARD_RENDER_INTERVAL import uuid import json +import platform +from collections import OrderedDict +import time _TYPE = type @@ -16,11 +25,360 @@ def get_card_class(card_type): return filtered_cards[0] +def _component_is_valid(component): + """ + Validates if the component is of the correct class. + """ + return issubclass(type(component), MetaflowCardComponent) + + +def warning_message(message, logger=None, ts=False): + if logger: + msg = "[@card WARNING] %s" % message + logger(msg, timestamp=ts, bad=True) + + class WarningComponent(ErrorComponent): def __init__(self, warning_message): super().__init__("@card WARNING", warning_message) +class ComponentStore: + """ + The `ComponentStore` object helps store the components for a single card in memory. + This class has combination of a array/dictionary like interfaces to access/change the stored components. + + It exposes the `append` /`extend` methods (like an array) to add components. + It also exposes the `__getitem__`/`__setitem__` methods (like a dictionary) to access the components by their Ids. + """ + + def _set_component_map(self): + """ + The `_component_map` attribute is supposed to be a dictionary so that we can access the components by their ids. + But we also want to maintain order in which components are inserted since all of these components are going to be visible on a UI. + Since python3.6 dictionaries are ordered by default so we can use the default python `dict`. + For python3.5 and below we need to use an OrderedDict since `dict`'s are not ordered by default. + """ + python_version = int(platform.python_version_tuple()[0]) * 10 + int( + platform.python_version_tuple()[1] + ) + if python_version < 36: + self._component_map = OrderedDict() + else: + self._component_map = {} + + def __init__(self, logger, card_type=None, components=None, user_set_id=None): + self._logger = logger + self._card_type = card_type + self._user_set_id = user_set_id + self._layout_last_changed_on = time.time() + self._set_component_map() + if components is not None: + for c in list(components): + self._store_component(c, component_id=None) + + @property + def layout_last_changed_on(self): + """This property helps the CardComponentManager identify when the layout of the card has changed so that it can trigger a re-render of the card.""" + return self._layout_last_changed_on + + def _realtime_updateable_components(self): + for c in self._component_map.values(): + if c.REALTIME_UPDATABLE: + yield c + + def _store_component(self, component, component_id=None): + if not _component_is_valid(component): + warning_message( + "Component (%s) is not a valid MetaflowCardComponent. It will not be stored." + % str(component), + self._logger, + ) + return + if component_id is not None: + component.component_id = component_id + elif component.component_id is None: + component.component_id = create_component_id(component) + setattr(component, "_logger", self._logger) + self._component_map[component.component_id] = component + self._layout_last_changed_on = time.time() + + def _remove_component(self, component_id): + del self._component_map[component_id] + self._layout_last_changed_on = time.time() + + def __iter__(self): + return iter(self._component_map.values()) + + def __setitem__(self, key, value): + if self._component_map.get(key) is not None: + # This is the equivalent of calling `current.card.components["mycomponent"] = Markdown("## New Component")` + # We don't support the replacement of individual components in the card. + # Instead we support rewriting the entire component array instead. + # So users can run `current.card[ID] = [FirstComponent, SecondComponent]` which will instantiate an entirely + # new ComponentStore. + # So we should throw an error over here, since it is clearly an operation which is not supported. + raise ComponentOverwriteNotSupportedException( + key, self._user_set_id, self._card_type + ) + else: + self._store_component(value, component_id=key) + + def __getitem__(self, key): + if key not in self._component_map: + # Store a stub-component in place since `key` doesnt exist. + # If the user does a `current.card.append(component, id=key)` + # then the stub component will be replaced by the actual component. + self._store_component(StubComponent(key), component_id=key) + return self._component_map[key] + + def __delitem__(self, key): + if key not in self._component_map: + raise KeyError( + "MetaflowCardComponent with id `%s` not found. Available components for the cards include : %s" + % (key, ", ".join(self.keys())) + ) + self._remove_component(key) + + def __contains__(self, key): + return key in self._component_map + + def append(self, component, id=None): + self._store_component(component, component_id=id) + + def extend(self, components): + for c in components: + self._store_component(c, component_id=None) + + def clear(self): + self._component_map.clear() + + def keys(self): + return list(self._component_map.keys()) + + def values(self): + return self._component_map.values() + + def __str__(self): + return "Card components present in the card: `%s` " % ("`, `".join(self.keys())) + + def __len__(self): + return len(self._component_map) + + +def _object_is_json_serializable(obj): + try: + json.dumps(obj) + return True + except TypeError as e: + return False + + +class CardComponentManager: + """ + This class manages the card's state for a single card. + - It uses the `ComponentStore` to manage the storage of the components + - It exposes methods to add, remove and access the components. + - It exposes a `refresh` method that will allow refreshing a card with new data + for realtime(ish) updates. + - The `CardComponentCollector` exposes convenience methods similar to this class for a default + editable card. These methods include : + - `append` + - `extend` + - `clear` + - `refresh` + - `components` + - `__iter__` + + ## Usage Patterns : + + ```python + current.card["mycardid"].append(component, id="comp123") + current.card["mycardid"].extend([component]) + current.card["mycardid"].refresh(data) # refreshes the card with new data + current.card["mycardid"].components["comp123"] # returns the component with id "comp123" + current.card["mycardid"].components["comp123"].update() + current.card["mycardid"].components.clear() # Wipe all the components + del current.card["mycardid"].components["mycomponentid"] # Delete a component + ``` + """ + + def __init__( + self, + card_uuid, + decorator_attributes, + card_creator, + components=None, + logger=None, + no_warnings=False, + user_set_card_id=None, + runtime_card=False, + card_options=None, + refresh_interval=5, + ): + self._card_creator_args = dict( + card_uuid=card_uuid, + user_set_card_id=user_set_card_id, + runtime_card=runtime_card, + decorator_attributes=decorator_attributes, + card_options=card_options, + logger=logger, + ) + self._card_creator = card_creator + self._refresh_interval = refresh_interval + self._last_layout_change = None + self._latest_user_data = None + self._last_refresh = 0 + self._last_render = 0 + self._render_seq = 0 + self._logger = logger + self._no_warnings = no_warnings + self._warn_once = { + "update": {}, + "not_implemented": {}, + } + card_type = decorator_attributes["type"] + + if components is None: + self._components = ComponentStore( + logger=self._logger, + card_type=card_type, + user_set_id=user_set_card_id, + components=None, + ) + else: + self._components = ComponentStore( + logger=self._logger, + card_type=card_type, + user_set_id=user_set_card_id, + components=list(components), + ) + + def append(self, component, id=None): + self._components.append(component, id=id) + + def extend(self, components): + self._components.extend(components) + + def clear(self): + self._components.clear() + + def _card_proc(self, mode): + self._card_creator.create(**self._card_creator_args, mode=mode) + + def refresh(self, data=None, force=False): + self._latest_user_data = data + nu = time.time() + + if nu - self._last_refresh < self._refresh_interval: + # rate limit refreshes: silently ignore requests that + # happen too frequently + return + self._last_refresh = nu + + # This block of code will render the card in `render_runtime` mode when: + # 1. refresh is called with `force=True` + # 2. Layout of the components in the card has changed. i.e. The actual elements in the component array have changed. + # 3. The last time the card was rendered was more the minimum interval after which they should be rendered. + last_rendered_before_minimum_interval = ( + nu - self._last_refresh + ) > RUNTIME_CARD_RENDER_INTERVAL + layout_has_changed = ( + self._last_layout_change != self.components.layout_last_changed_on + or self._last_layout_change is None + ) + + if force or last_rendered_before_minimum_interval or layout_has_changed: + self._render_seq += 1 + self._last_render = nu + self._card_proc("render_runtime") + # We set self._last_layout_change so that when self._last_layout_change is not the same + # as `self.components.layout_last_changed_on`, then the component array itself + # has been modified. So we should force a re-render of the card. + self._last_layout_change = self.components.layout_last_changed_on + else: + self._card_proc("refresh") + + @property + def components(self): + return self._components + + def _warning(self, message): + msg = "[@card WARNING] %s" % message + self._logger(msg, timestamp=False, bad=True) + + def _get_latest_data(self, final=False, mode=None): + """ + This function returns the data object that is passed down to : + - `MetaflowCard.render_runtime` + - `MetaflowCard.refresh` + - `MetaflowCard.reload_content_token` + + The return value of this function contains all the necessary state information for Metaflow Cards to make decisions on the following: + 1. What components are rendered + 2. Should the card be reloaded on the UI + 3. What data to pass down to the card. + + Parameters + ---------- + final : bool, optional + If True, it implies that the final "rendering" sequence is taking place (which involves calling a `render` and a `refresh` function.) + When final is set the `render_seq` is set to "final" so that the reload token in the card is set to final + and the card is not reloaded again on the user interface. + mode : str + This parameter is passed down to the object returned by this function. Can be one of `render_runtime` / `refresh` / `render` + + Returns + ------- + dict + A dictionary of the form : + ```python + { + "user": user_data, # any passed to `current.card.refresh` function + "components": component_dict, # all rendered REALTIME_UPDATABLE components + "render_seq": seq, + # `render_seq` is a counter that is incremented every time `render_runtime` is called. + # If a metaflow card has a RELOAD_POLICY_ALWAYS set then the reload token will be set to this value + # so that the card reload on the UI everytime `render_runtime` is called. + "component_update_ts": self.components.layout_last_changed_on, + # `component_update_ts` is the timestamp of the last time the component array was modified. + # `component_update_ts` can get used by the `reload_content_token` to make decisions on weather to + # reload the card on the UI when component array has changed. + "mode": mode, + } + ``` + """ + seq = "final" if final else self._render_seq + # Extract all the runtime-updatable components as a dictionary + component_dict = {} + for component in self._components._realtime_updateable_components(): + rendered_comp = _render_card_component(component) + if rendered_comp is not None: + component_dict.update({component.component_id: rendered_comp}) + + # Verify _latest_user_data is json serializable + user_data = {} + if self._latest_user_data is not None and not _object_is_json_serializable( + self._latest_user_data + ): + self._warning( + "Data provided to `refresh` is not JSON serializable. It will be ignored." + ) + else: + user_data = self._latest_user_data + + return { + "user": user_data, + "components": component_dict, + "render_seq": seq, + "component_update_ts": self.components.layout_last_changed_on, + "mode": mode, + } + + def __iter__(self): + return iter(self._components) + + class CardComponentCollector: """ This class helps collect `MetaflowCardComponent`s during runtime execution @@ -42,25 +400,34 @@ class CardComponentCollector: - [x] by looking it up by its type, e.g. `current.card.get(type='pytorch')`. """ - def __init__(self, logger=None): + def __init__(self, logger=None, card_creator=None): from metaflow.metaflow_config import CARD_NO_WARNING - self._cards_components = ( + self._card_component_store = ( + # Each key in the dictionary is the UUID of an individual card. + # value is of type `CardComponentManager`, holding a list of MetaflowCardComponents for that particular card {} - ) # a dict with key as uuid and value as a list of MetaflowCardComponent. + ) self._cards_meta = ( {} ) # a `dict` of (card_uuid, `dict)` holding all metadata about all @card decorators on the `current` @step. self._card_id_map = {} # card_id to uuid map for all cards with ids self._logger = logger + self._card_creator = card_creator # `self._default_editable_card` holds the uuid of the card that is default editable. This card has access to `append`/`extend` methods of `self` self._default_editable_card = None - self._warned_once = {"__getitem__": {}, "append": False, "extend": False} + self._warned_once = { + "__getitem__": {}, + "append": False, + "extend": False, + "update": False, + "update_no_id": False, + } self._no_warnings = True if CARD_NO_WARNING else False @staticmethod def create_uuid(): - return str(uuid.uuid4()) + return str(uuid.uuid4()).replace("-", "") def _log(self, *args, **kwargs): if self._logger: @@ -70,9 +437,13 @@ def _add_card( self, card_type, card_id, + decorator_attributes, + card_options, editable=False, customize=False, suppress_warnings=False, + runtime_card=False, + refresh_interval=5, ): """ This function helps collect cards from all the card decorators. @@ -95,9 +466,24 @@ def _add_card( editable=editable, customize=customize, suppress_warnings=suppress_warnings, + runtime_card=runtime_card, + decorator_attributes=decorator_attributes, + card_options=card_options, + refresh_interval=refresh_interval, ) self._cards_meta[card_uuid] = card_metadata - self._cards_components[card_uuid] = [] + self._card_component_store[card_uuid] = CardComponentManager( + card_uuid, + decorator_attributes, + self._card_creator, + components=None, + logger=self._logger, + no_warnings=self._no_warnings, + user_set_card_id=card_id, + runtime_card=runtime_card, + card_options=card_options, + refresh_interval=refresh_interval, + ) return card_metadata def _warning(self, message): @@ -107,9 +493,9 @@ def _warning(self, message): def _add_warning_to_cards(self, warn_msg): if self._no_warnings: return - for card_id in self._cards_components: + for card_id in self._card_component_store: if not self._cards_meta[card_id]["suppress_warnings"]: - self._cards_components[card_id].append(WarningComponent(warn_msg)) + self._card_component_store[card_id].append(WarningComponent(warn_msg)) def get(self, type=None): """`get` @@ -128,7 +514,7 @@ def get(self, type=None): for card_meta in self._cards_meta.values() if card_meta["type"] == card_type ] - return [self._cards_components[uuid] for uuid in card_uuids] + return [self._card_component_store[uuid] for uuid in card_uuids] def _finalize(self): """ @@ -229,13 +615,13 @@ def __getitem__(self, key): Returns ------- - CardComponentCollector + CardComponentManager An object with `append` and `extend` calls which allow you to add components to the chosen card. """ if key in self._card_id_map: card_uuid = self._card_id_map[key] - return self._cards_components[card_uuid] + return self._card_component_store[card_uuid] if key not in self._warned_once["__getitem__"]: _warn_msg = [ "`current.card['%s']` is not present. Please set the `id` argument in @card to '%s' to access `current.card['%s']`." @@ -246,6 +632,7 @@ def __getitem__(self, key): self._warning(" ".join(_warn_msg)) self._add_warning_to_cards("\n".join(_warn_msg)) self._warned_once["__getitem__"][key] = True + return [] def __setitem__(self, key, value): @@ -263,7 +650,7 @@ def __setitem__(self, key, value): key: str Card ID. - value: List[CardComponent] + value: List[MetaflowCardComponent] List of card components to assign to this card. """ if key in self._card_id_map: @@ -275,7 +662,18 @@ def __setitem__(self, key, value): ) self._warning(_warning_msg) return - self._cards_components[card_uuid] = value + self._card_component_store[card_uuid] = CardComponentManager( + card_uuid, + self._cards_meta[card_uuid]["decorator_attributes"], + self._card_creator, + components=value, + logger=self._logger, + no_warnings=self._no_warnings, + user_set_card_id=key, + card_options=self._cards_meta[card_uuid]["card_options"], + runtime_card=self._cards_meta[card_uuid]["runtime_card"], + refresh_interval=self._cards_meta[card_uuid]["refresh_interval"], + ) return self._warning( @@ -283,18 +681,18 @@ def __setitem__(self, key, value): % (key, key, key) ) - def append(self, component): + def append(self, component, id=None): """ Appends a component to the current card. Parameters ---------- - component : CardComponent + component : MetaflowCardComponent Card component to add to this card. """ if self._default_editable_card is None: if ( - len(self._cards_components) == 1 + len(self._card_component_store) == 1 ): # if there is one card which is not the _default_editable_card then the card is not editable card_type = list(self._cards_meta.values())[0]["type"] if list(self._cards_meta.values())[0]["exists"]: @@ -324,7 +722,7 @@ def append(self, component): self._warned_once["append"] = True return - self._cards_components[self._default_editable_card].append(component) + self._card_component_store[self._default_editable_card].append(component, id=id) def extend(self, components): """ @@ -332,12 +730,12 @@ def extend(self, components): Parameters ---------- - component : Iterator[CardComponent] + component : Iterator[MetaflowCardComponent] Card components to add to this card. """ if self._default_editable_card is None: # if there is one card which is not the _default_editable_card then the card is not editable - if len(self._cards_components) == 1: + if len(self._card_component_store) == 1: card_type = list(self._cards_meta.values())[0]["type"] _warning_msg = [ "Card of type `%s` is not an editable card." % card_type, @@ -357,7 +755,52 @@ def extend(self, components): return - self._cards_components[self._default_editable_card].extend(components) + self._card_component_store[self._default_editable_card].extend(components) + + @property + def components(self): + # FIXME: document + if self._default_editable_card is None: + if len(self._card_component_store) == 1: + card_type = list(self._cards_meta.values())[0]["type"] + _warning_msg = [ + "Card of type `%s` is not an editable card." % card_type, + "Components list will not be updated and `current.card.components` will not work for any call during this runtime execution.", + "Please use an editable card", # todo : link to documentation + ] + else: + _warning_msg = [ + "`current.card.components` cannot disambiguate between multiple @card decorators.", + "Components list will not be accessible and `current.card.components` will not work for any call during this runtime execution.", + "To fix this set the `id` argument in all @card when using multiple @card decorators over a single @step and reference `current.card[ID].components`", # todo : Add Link to documentation + "to update/access the appropriate card component.", + ] + if not self._warned_once["components"]: + self._warning(" ".join(_warning_msg)) + self._warned_once["components"] = True + return + + return self._card_component_store[self._default_editable_card].components + + def clear(self): + # FIXME: document + if self._default_editable_card is not None: + self._card_component_store[self._default_editable_card].clear() + + def refresh(self, *args, **kwargs): + # FIXME: document + if self._default_editable_card is not None: + self._card_component_store[self._default_editable_card].refresh( + *args, **kwargs + ) + + def _get_latest_data(self, card_uuid, final=False, mode=None): + """ + Returns latest data so it can be used in the final render() call + """ + return self._card_component_store[card_uuid]._get_latest_data( + final=final, mode=mode + ) def _serialize_components(self, card_uuid): """ @@ -366,36 +809,43 @@ def _serialize_components(self, card_uuid): don't render safely then we don't add them to the final list of serialized functions """ serialized_components = [] - if card_uuid not in self._cards_components: + if card_uuid not in self._card_component_store: return [] has_user_components = any( [ issubclass(type(component), UserComponent) - for component in self._cards_components[card_uuid] + for component in self._card_component_store[card_uuid] ] ) - for component in self._cards_components[card_uuid]: - if not issubclass(type(component), MetaflowCardComponent): - continue - try: - rendered_obj = component.render() - except: + for component in self._card_component_store[card_uuid]: + rendered_obj = _render_card_component(component) + if rendered_obj is None: continue - else: - if not (type(rendered_obj) == str or type(rendered_obj) == dict): - continue - else: - # Since `UserComponent`s are safely_rendered using render_tools.py - # we don't need to check JSON serialization as @render_tools.render_safely - # decorator ensures this check so there is no need to re-serialize - if not issubclass(type(component), UserComponent): - try: # check if rendered object is json serializable. - json.dumps(rendered_obj) - except (TypeError, OverflowError) as e: - continue - serialized_components.append(rendered_obj) + serialized_components.append(rendered_obj) if has_user_components and len(serialized_components) > 0: serialized_components = [ SectionComponent(contents=serialized_components).render() ] return serialized_components + + +def _render_card_component(component): + if not _component_is_valid(component): + return None + try: + rendered_obj = component.render() + except: + return None + else: + if not (type(rendered_obj) == str or type(rendered_obj) == dict): + return None + else: + # Since `UserComponent`s are safely_rendered using render_tools.py + # we don't need to check JSON serialization as @render_tools.render_safely + # decorator ensures this check so there is no need to re-serialize + if not issubclass(type(component), UserComponent): + try: # check if rendered object is json serializable. + json.dumps(rendered_obj) + except (TypeError, OverflowError) as e: + return None + return rendered_obj diff --git a/metaflow/plugins/cards/exception.py b/metaflow/plugins/cards/exception.py index 0b9188b447..fdc96f86e2 100644 --- a/metaflow/plugins/cards/exception.py +++ b/metaflow/plugins/cards/exception.py @@ -116,7 +116,7 @@ def __init__(self, task): super(UnresolvableDatastoreException, self).__init__(msg) -class IncorrectArguementException(MetaflowException): +class IncorrectArgumentException(MetaflowException): headline = ( "`get_cards` function requires a `Task` object or pathspec as an argument" ) @@ -138,3 +138,22 @@ def __init__(self, pthspec): % pthspec ) super().__init__(msg=msg, lineno=None) + + +class ComponentOverwriteNotSupportedException(MetaflowException): + headline = "Component overwrite is not supported" + + def __init__(self, component_id, card_id, card_type): + id_str = "" + if card_id is not None: + id_str = "id='%s'" % card_id + msg = ( + "Card component overwrite is not supported. " + "Component with id %s already exists in the @card(type='%s', %s). \n" + "Instead of calling `current.card.components[ID] = MyComponent`. " + "You can overwrite the entire component Array by calling " + "`current.card.components = [MyComponent]`" + ) % (component_id, card_type, id_str) + super().__init__( + msg=msg, + ) diff --git a/test/core/contexts.json b/test/core/contexts.json index f8eac9d746..13b239f5f2 100644 --- a/test/core/contexts.json +++ b/test/core/contexts.json @@ -27,7 +27,39 @@ "checks": [ "python3-cli", "python3-metadata"], "disabled_tests": [ "LargeArtifactTest", - "S3FailureTest" + "S3FailureTest", + "CardComponentRefreshTest", + "CardWithRefreshTest" + ] + }, + { + "name": "python3-all-local-cards-realtime", + "disabled": true, + "env": { + "METAFLOW_USER": "tester", + "METAFLOW_RUN_BOOL_PARAM": "False", + "METAFLOW_RUN_NO_DEFAULT_PARAM": "test_str", + "METAFLOW_DEFAULT_METADATA": "local" + }, + "python": "python3", + "top_options": [ + "--metadata=local", + "--datastore=local", + "--environment=local", + "--event-logger=nullSidecarLogger", + "--no-pylint", + "--quiet" + ], + "run_options": [ + "--max-workers", "50", + "--max-num-splits", "10000", + "--tag", "\u523a\u8eab means sashimi", + "--tag", "multiple tags should be ok" + ], + "checks": [ "python3-cli", "python3-metadata"], + "enabled_tests": [ + "CardComponentRefreshTest", + "CardWithRefreshTest" ] }, { diff --git a/test/core/graphs/branch.json b/test/core/graphs/branch.json index 763842f64f..4c9db5058c 100644 --- a/test/core/graphs/branch.json +++ b/test/core/graphs/branch.json @@ -2,8 +2,8 @@ "name": "single-and-branch", "graph": { "start": {"branch": ["a", "b"], "quals": ["split-and"]}, - "a": {"linear": "join"}, - "b": {"linear": "join"}, + "a": {"linear": "join", "quals": ["single-branch-split"]}, + "b": {"linear": "join", "quals": ["single-branch-split"]}, "join": {"linear": "end", "join": true, "quals": ["join-and"]}, "end": {} } diff --git a/test/core/metaflow_test/__init__.py b/test/core/metaflow_test/__init__.py index 0c5012f043..1e12d47f18 100644 --- a/test/core/metaflow_test/__init__.py +++ b/test/core/metaflow_test/__init__.py @@ -2,6 +2,8 @@ import os from metaflow.exception import MetaflowException from metaflow import current +from metaflow.cards import get_cards +from metaflow.plugins.cards.exception import CardNotPresentException def steps(prio, quals, required=False): @@ -31,6 +33,39 @@ def truncate(var): return var +def retry_untill_timeout(cb_fn, *args, timeout=4, **kwargs): + """ + certain operations in metaflow may not be synchronous and may be running fully asynchronously. + This creates a problem in writing tests that verify some behaviour at runtime. This function + is a helper that allows us to wait for a certain amount of time for a callback function to + return a non-False value. + """ + import time + + start = time.time() + while True: + cb_val = cb_fn(*args, **kwargs) + if cb_val is not False: + return cb_val + if time.time() - start > timeout: + raise TimeoutError("Timeout waiting for callback to return non-False value") + time.sleep(1) + + +def try_to_get_card(id=None, timeout=4): + """ + Safetly try to get the card object until a timeout value. + """ + + def _get_card(card_id): + container = get_card_container(id=card_id) + if container is None: + return False + return container[0] + + return retry_untill_timeout(_get_card, id, timeout=timeout) + + class AssertArtifactFailed(Exception): pass @@ -66,6 +101,16 @@ def __init__(self): super(TestRetry, self).__init__("This is not an error. " "Testing retry...") +def get_card_container(id=None): + """ + Safetly try to load the card_container object. + """ + try: + return get_cards(current.pathspec, id=id) + except CardNotPresentException: + return None + + def is_resumed(): return current.origin_run_id is not None @@ -144,6 +189,12 @@ def assert_log(self, step, logtype, value, exact_match=True): def get_card(self, step, task, card_type): raise NotImplementedError() + def get_card_data(self, step, task, card_type, card_id=None): + """ + returns : (card_present, card_data) + """ + raise NotImplementedError() + def list_cards(self, step, task, card_type=None): raise NotImplementedError() diff --git a/test/core/metaflow_test/formatter.py b/test/core/metaflow_test/formatter.py index a68d134092..bbdd9312ed 100644 --- a/test/core/metaflow_test/formatter.py +++ b/test/core/metaflow_test/formatter.py @@ -85,7 +85,7 @@ def _flow_lines(self): yield 0, "# -*- coding: utf-8 -*-" yield 0, "from metaflow import FlowSpec, step, Parameter, project, IncludeFile, JSONType, current, parallel" - yield 0, "from metaflow_test import assert_equals, assert_equals_metadata, assert_exception, ExpectationFailed, is_resumed, ResumeFromHere, TestRetry" + yield 0, "from metaflow_test import assert_equals, assert_equals_metadata, assert_exception, ExpectationFailed, is_resumed, ResumeFromHere, TestRetry, try_to_get_card" if tags: yield 0, "from metaflow import %s" % ",".join(tags) diff --git a/test/core/metaflow_test/metadata_check.py b/test/core/metaflow_test/metadata_check.py index 543c6cee76..f003053705 100644 --- a/test/core/metaflow_test/metadata_check.py +++ b/test/core/metaflow_test/metadata_check.py @@ -167,6 +167,24 @@ def assert_card( ) return True + def get_card_data(self, step, task, card_type, card_id=None): + """ + returns : (card_present, card_data) + """ + from metaflow.plugins.cards.exception import CardNotPresentException + + try: + card_iter = self.get_card(step, task, card_type, card_id=card_id) + except CardNotPresentException: + return False, None + if card_id is None: + # Return the first piece of card_data we can find. + return True, card_iter[0].get_data() + for card in card_iter: + if card.id == card_id: + return True, card.get_data() + return False, None + def get_log(self, step, logtype): return "".join(getattr(task, logtype) for task in self.run[step]) diff --git a/test/core/tests/card_component_refresh_test.py b/test/core/tests/card_component_refresh_test.py new file mode 100644 index 0000000000..68502cfec9 --- /dev/null +++ b/test/core/tests/card_component_refresh_test.py @@ -0,0 +1,158 @@ +from metaflow_test import MetaflowTest, ExpectationFailed, steps, tag + + +class CardComponentRefreshTest(MetaflowTest): + """ + This test will validates the card component API based for runtime updates. + """ + + PRIORITY = 3 + + @tag('environment(vars={"METAFLOW_CARD_NO_WARNING": "True"})') + @tag('card(type="test_component_refresh_card", id="refresh_card", save_errors=False)') # fmt: skip + @steps( + 0, + [ + "singleton-start", + "sigleton-end", + "singleton", + "foreach-split-small", + "foreach-inner-small", + "foreach-join-small", + "split-and", + "single-branch-split", + "join-and", + "parallel-step", + ], + ) + def step_start(self): + import random + import string + + def _create_random_strings(char_len): + return "".join(random.choice(string.ascii_letters) for i in range(char_len)) + + def _array_is_a_subset(arr1, arr2): + return set(arr1).issubset(set(arr2)) + + def create_random_string_array(size=10): + return [_create_random_strings(10) for i in range(size)] + + from metaflow import current + from metaflow.plugins.cards.card_client import Card + from metaflow.plugins.cards.card_modules.test_cards import ( + _component_values_to_hash, + TestJSONComponent, + ) + import random + import time + + possible_reload_tokens = [] + make_reload_token = lambda a1, a2: "runtime-%s" % _component_values_to_hash( + {"random_key_1": {"abc": a1}, "random_key_2": {"abc": a2}} + ) + component_1_arr = create_random_string_array(5) + # Calling the first refresh should trigger a render of the card. + current.card.append( + TestJSONComponent({"abc": component_1_arr}), id="component_1" + ) + component_2_arr = create_random_string_array(5) + inscope_component = TestJSONComponent({"abc": component_2_arr}) + current.card.append(inscope_component) + current.card.refresh() + # sleep for a little bit because the card refresh is async. + # This feels a little hacky but need better ideas on how to test this + # when async processes may write cards/data in a "best-effort" manner. + # The `try_to_get_card` function will keep retrying to get a card until a + # timeout value is reached. After which the function will throw a `TimeoutError`. + _reload_tok = make_reload_token(component_1_arr, component_2_arr) + card = try_to_get_card(id="refresh_card", timeout=10) + assert_equals(isinstance(card, Card), True) + + sleep_between_refreshes = 2 # Set based on the RUNTIME_CARD_MIN_REFRESH_INTERVAL which acts as a rate-limit to what is refreshed. + + card_html = card.get() + possible_reload_tokens.append(_reload_tok) + # The reload token for card type `test_component_refresh_card` contains a hash of the component values. + # The first assertion will check if this reload token exists is set to what we expect in the HTML page. + assert_equals(_reload_tok in card_html, True) + + card_data = None + for i in range(5): + # We need to test the following : + # 1. We can call `inscope_component.update()` with new data and it will be reflected in the card. + # 2. `current.card.components["component1"].update()` and it will be reflected in the card. + # How do we test it : + # 1. Add new values to the components that have been created. + # 2. Since the card is calculating the reload token based on the hash of the value, we verify that dataupdates have the same reload token or any of the possible reload tokens. + # 3. We also verify that the card_data contains the `data` key that has the lastest information updated for `component_1` + component_2_arr.append(_create_random_strings(10)) + component_1_arr.append(_create_random_strings(10)) + + inscope_component.update({"abc": component_2_arr}) + current.card.components["component_1"].update({"abc": component_1_arr}) + _reload_tok = make_reload_token(component_1_arr, component_2_arr) + current.card.refresh() + + possible_reload_tokens.append(_reload_tok) + card_data = card.get_data() + if card_data is not None: + assert_equals(card_data["reload_token"] in possible_reload_tokens, True) + assert_equals( + _array_is_a_subset( + card_data["data"]["component_1"]["abc"], component_1_arr + ), + True, + ) + time.sleep(sleep_between_refreshes) + + assert_equals(card_data is not None, True) + self.final_data = component_1_arr + # setting step name here helps us figure out what steps should be validated by the checker + self.step_name = current.step_name + + @steps(1, ["all"]) + def step_all(self): + pass + + def check_results(self, flow, checker): + def _array_is_a_subset(arr1, arr2): + return set(arr1).issubset(set(arr2)) + + if checker.__class__.__name__ != "MetadataCheck": + return + run = checker.get_run() + for step in flow: + meta_check_dict = checker.artifact_dict_if_exists(step.name, "final_data") + # Which ever steps ran the actual card testing code + # contains the `final_data` attribute and the `step_name` attribute. + # If these exist then we can succesfully validate the card data since it is meant to exist. + step_done_check_dict = checker.artifact_dict_if_exists( + step.name, "step_name" + ) + for task_id in step_done_check_dict: + if ( + len(step_done_check_dict[task_id]) == 0 + or step_done_check_dict[task_id]["step_name"] != step.name + ): + print( + "Skipping task pathspec %s" % run[step.name][task_id].pathspec + ) + continue + # If the `step_name` attribute was set then surely `final_data` will also be set; + data_obj = meta_check_dict[task_id]["final_data"] + card_present, card_data = checker.get_card_data( + step.name, + task_id, + "test_component_refresh_card", + card_id="refresh_card", + ) + assert_equals(card_present, True) + data_has_latest_artifact = _array_is_a_subset( + data_obj, card_data["data"]["component_1"]["abc"] + ) + assert_equals(data_has_latest_artifact, True) + print( + "Succesfully validated task pathspec %s" + % run[step.name][task_id].pathspec + ) diff --git a/test/core/tests/card_refresh_test.py b/test/core/tests/card_refresh_test.py new file mode 100644 index 0000000000..f7152bc521 --- /dev/null +++ b/test/core/tests/card_refresh_test.py @@ -0,0 +1,155 @@ +from metaflow_test import MetaflowTest, ExpectationFailed, steps, tag + + +class CardWithRefreshTest(MetaflowTest): + """ + This test Does few checks that the core user interfaces are working : + 1. It validates we can call `current.card.refresh` without any errors. + 2. It validates if the data updates that are getting shipped are correct. + + How will it do it : + + 1. In step code: + 1. We create a random array of strings. + 2. We call `current.card.refresh` with the array. + 3. We check if the card is present given that we have called referesh and the card + should have reached the backend in some short period of time + 4. Keep adding new data to the array and keep calling refresh. + 5. The data-update that got shipped should *atleast* be a subset the actual data present in the runtime code. + 2. In check_results: + 1. We check if the data that got shipped can be access post task completion + 2. We check if the data that got shipped is a subset of the actual data created during the runtime code. + """ + + PRIORITY = 3 + + @tag('environment(vars={"METAFLOW_CARD_NO_WARNING": "True"})') + @tag('card(type="test_refresh_card", id="refresh_card")') + @steps( + 0, + [ + "singleton-start", + "sigleton-end", + "singleton", + "foreach-split-small", + "foreach-inner-small", + "foreach-join-small", + "split-and", + "single-branch-split", + "join-and", + "parallel-step", + ], + ) + def step_start(self): + import random + import string + + def _create_random_strings(char_len): + return "".join(random.choice(string.ascii_letters) for i in range(char_len)) + + def _array_is_a_subset(arr1, arr2): + return set(arr1).issubset(set(arr2)) + + from metaflow import current + from metaflow.cards import get_cards + from metaflow.plugins.cards.card_client import Card + import random + import time + + start_arr = [_create_random_strings(10) for i in range(5)] + # Calling the first refresh should trigger a render of the card. + current.card.refresh({"arr": start_arr}) + # sleep for a little bit because the card refresh is async. + # This feels a little hacky but need better ideas on how to test this + # when async processes may write cards/data in a "best-effort" manner. + # The `try_to_get_card` function will keep retrying to get a card until a + # timeout value is reached. After which the function will throw a `TimeoutError`. + card = try_to_get_card(id="refresh_card", timeout=4) + assert_equals(isinstance(card, Card), True) + + sleep_between_refreshes = 4 # Set based on the RUNTIME_CARD_MIN_REFRESH_INTERVAL which acts as a rate-limit to what is refreshed. + + # Now we check if the refresh interface is working as expected from data updates. + card_data = None + for i in range(5): + # We need to put sleep statements because: + # 1. the update to cards is run via async processes + # 2. card refreshes are rate-limited by RUNTIME_CARD_MIN_REFRESH_INTERVAL so we can't validate with each update. + # There by there is no consistent way to know from during user-code when a data update + # actually got shiped. + start_arr.append(_create_random_strings(10)) + current.card.refresh({"arr": start_arr}) + # We call the `card.get_data` interface to validate the data is available in the card. + # This is a private interface and should not be used by users but is used by internal services. + card_data = card.get_data() + if card_data is not None: + # Assert that data is atleast subset of what we sent to the datastore. + assert_equals( + _array_is_a_subset(card_data["data"]["user"]["arr"], start_arr), + True, + ) + # The `TestRefreshCard.refresh(task, data)` method returns the `data` object as a pass through. + # This test will also serve a purpose of ensuring that any changes to these keys are + # caught by the test framework. The minimum subset should be present and grown as + # need requires. + # We first check the keys created by the refresh-JSON created in the `card_cli.py` + top_level_keys = set(["data", "reload_token"]) + assert_equals(top_level_keys.issubset(set(card_data.keys())), True) + # We then check the keys returned from the `current.card._get_latest_data` which is the + # `data` parameter in the `MetaflowCard.refresh ` method. + required_data_keys = set( + ["mode", "component_update_ts", "components", "render_seq", "user"] + ) + assert_equals( + required_data_keys.issubset(set(card_data["data"].keys())), True + ) + + time.sleep(sleep_between_refreshes) + + assert_equals(card_data is not None, True) + self.final_data = {"arr": start_arr} + # setting step name here helps us figure out what steps should be validated by the checker + self.step_name = current.step_name + + @steps(1, ["all"]) + def step_all(self): + pass + + def check_results(self, flow, checker): + def _array_is_a_subset(arr1, arr2): + return set(arr1).issubset(set(arr2)) + + if checker.__class__.__name__ != "MetadataCheck": + return + run = checker.get_run() + for step in flow: + meta_check_dict = checker.artifact_dict_if_exists(step.name, "final_data") + # Which ever steps ran the actual card testing code + # contains the `final_data` attribute and the `step_name` attribute. + # If these exist then we can succesfully validate the card data since it is meant to exist. + step_done_check_dict = checker.artifact_dict_if_exists( + step.name, "step_name" + ) + for task_id in step_done_check_dict: + if ( + len(step_done_check_dict[task_id]) == 0 + or step_done_check_dict[task_id]["step_name"] != step.name + ): + print( + "Skipping task pathspec %s" % run[step.name][task_id].pathspec + ) + continue + # If the `step_name` attribute was set then surely `final_data` will also be set; + data_obj = meta_check_dict[task_id]["final_data"] + card_present, card_data = checker.get_card_data( + step.name, task_id, "test_refresh_card", card_id="refresh_card" + ) + assert_equals(card_present, True) + data_has_latest_artifact = _array_is_a_subset( + data_obj["arr"], card_data["data"]["user"]["arr"] + ) + assert_equals(data_has_latest_artifact, True) + print( + "Succesfully validated task pathspec %s" + % run[step.name][task_id].pathspec + ) diff --git a/test_runner b/test_runner index 8de1f3ecdf..2f4bd40b80 100755 --- a/test_runner +++ b/test_runner @@ -16,7 +16,15 @@ install_extensions() { } run_tests() { - cd test/core && PYTHONPATH=`pwd`/../../ python3 run_tests.py --num-parallel 8 + cd test/core && PYTHONPATH=`pwd`/../../ python3 run_tests.py --num-parallel 8 && cd ../../ } -install_deps && install_extensions && run_tests +# We run realtime cards tests separately because there these tests validate the asynchronous updates to the +# information stored in the datastore. So if there are other processes starving resources then these tests will +# surely fail since a lot of checks have timeouts. +run_runtime_card_tests() { + CARD_GRAPHS="small-foreach,small-parallel,nested-branches,single-linear-step,simple-foreach" + cd test/core && PYTHONPATH=`pwd`/../../ python3 run_tests.py --num-parallel 8 --contexts python3-all-local-cards-realtime --graphs $CARD_GRAPHS && cd ../../ +} + +install_deps && install_extensions && run_tests && run_runtime_card_tests