From ec58dd04557b8f58279a8803ae04bc9db23a590d Mon Sep 17 00:00:00 2001 From: Valay Dave Date: Thu, 19 Oct 2023 14:48:04 -0700 Subject: [PATCH] [card][tests] verify the behavior of `.refresh` / `.components` --- metaflow/plugins/__init__.py | 4 + metaflow/plugins/cards/card_cli.py | 7 + .../plugins/cards/card_modules/test_cards.py | 114 +++++++++++++ test/core/graphs/branch.json | 4 +- test/core/metaflow_test/__init__.py | 51 ++++++ test/core/metaflow_test/formatter.py | 2 +- test/core/metaflow_test/metadata_check.py | 18 ++ .../core/tests/card_component_refresh_test.py | 158 ++++++++++++++++++ test/core/tests/card_refresh_test.py | 155 +++++++++++++++++ 9 files changed, 510 insertions(+), 3 deletions(-) create mode 100644 test/core/tests/card_component_refresh_test.py create mode 100644 test/core/tests/card_refresh_test.py diff --git a/metaflow/plugins/__init__.py b/metaflow/plugins/__init__.py index 13c45e0cfe..9f2d4e9289 100644 --- a/metaflow/plugins/__init__.py +++ b/metaflow/plugins/__init__.py @@ -166,6 +166,8 @@ def get_plugin_cli(): TestNonEditableCard, TestPathSpecCard, TestTimeoutCard, + TestRefreshCard, + TestRefreshComponentCard, ) CARDS = [ @@ -182,5 +184,7 @@ def get_plugin_cli(): TestNonEditableCard, BlankCard, DefaultCardJSON, + TestRefreshCard, + TestRefreshComponentCard, ] merge_lists(CARDS, MF_EXTERNAL_CARDS, "type") diff --git a/metaflow/plugins/cards/card_cli.py b/metaflow/plugins/cards/card_cli.py index bd9f02f0c5..139e7e6d81 100644 --- a/metaflow/plugins/cards/card_cli.py +++ b/metaflow/plugins/cards/card_cli.py @@ -703,6 +703,13 @@ def create( ) rendered_content = rendered_info.data except: + rendered_info = CardRenderInfo( + mode=mode, + is_implemented=True, + data=None, + timed_out=False, + timeout_stack_trace=None, + ) if render_error_card: error_stack_trace = str(UnrenderableCardException(type, options)) else: diff --git a/metaflow/plugins/cards/card_modules/test_cards.py b/metaflow/plugins/cards/card_modules/test_cards.py index b756fb37db..9a6e2ff7dc 100644 --- a/metaflow/plugins/cards/card_modules/test_cards.py +++ b/metaflow/plugins/cards/card_modules/test_cards.py @@ -1,13 +1,20 @@ +import json from .card import MetaflowCard, MetaflowCardComponent +from .renderer_tools import render_safely class TestStringComponent(MetaflowCardComponent): + REALTIME_UPDATABLE = True + def __init__(self, text): self._text = text def render(self): return str(self._text) + def update(self, text): + self._text = text + class TestPathSpecCard(MetaflowCard): type = "test_pathspec_card" @@ -98,3 +105,110 @@ def render(self, task): time.sleep(self._timeout) return "%s" % task.pathspec + + +REFRESHABLE_HTML_TEMPLATE = """ + + +

[PATHSPEC]

+

[REPLACE_CONTENT_HERE]

+ +""" + + +class TestJSONComponent(MetaflowCardComponent): + + REALTIME_UPDATABLE = True + + def __init__(self, data): + self._data = data + + @render_safely + def render(self): + return self._data + + def update(self, data): + self._data = data + + +class TestRefreshCard(MetaflowCard): + + """ + This card takes no components and helps test the `current.card.refresh(data)` interface. + """ + + HTML_TEMPLATE = REFRESHABLE_HTML_TEMPLATE + + RUNTIME_UPDATABLE = True + + ALLOW_USER_COMPONENTS = True + + # Not implementing Reload Policy here since the reload Policy is set to always + RELOAD_POLICY = MetaflowCard.RELOAD_POLICY_ALWAYS + + type = "test_refresh_card" + + def render(self, task, data) -> str: + return self.HTML_TEMPLATE.replace( + "[REPLACE_CONTENT_HERE]", json.dumps(data["user"]) + ).replace("[PATHSPEC]", task.pathspec) + + def render_runtime(self, task, data): + return self.render(task, data) + + def refresh(self, task, data): + return data + + +import hashlib + + +def _component_values_to_hash(components): + comma_str = ",".join(["".join(x) for v in components.values() for x in v]) + return hashlib.sha256(comma_str.encode("utf-8")).hexdigest() + + +class TestRefreshComponentCard(MetaflowCard): + + """ + This card takes components and helps test the `current.card.components["A"].update()` + interface + """ + + HTML_TEMPLATE = REFRESHABLE_HTML_TEMPLATE + + RUNTIME_UPDATABLE = True + + ALLOW_USER_COMPONENTS = True + + # Not implementing Reload Policy here since the reload Policy is set to always + RELOAD_POLICY = MetaflowCard.RELOAD_POLICY_ONCHANGE + + type = "test_component_refresh_card" + + def __init__(self, options={}, components=[], graph=None): + self._components = components + + def render(self, task, data) -> str: + # Calling `render`/`render_runtime` wont require the `data` object + return self.HTML_TEMPLATE.replace( + "[REPLACE_CONTENT_HERE]", json.dumps(self._components) + ).replace("[PATHSPEC]", task.pathspec) + + def render_runtime(self, task, data): + return self.render(task, data) + + def refresh(self, task, data): + # Govers the information passed in the data update + return data["components"] + + def reload_content_token(self, task, data): + if task.finished: + return "final" + return "runtime-%s" % _component_values_to_hash(data["components"]) diff --git a/test/core/graphs/branch.json b/test/core/graphs/branch.json index 763842f64f..4c9db5058c 100644 --- a/test/core/graphs/branch.json +++ b/test/core/graphs/branch.json @@ -2,8 +2,8 @@ "name": "single-and-branch", "graph": { "start": {"branch": ["a", "b"], "quals": ["split-and"]}, - "a": {"linear": "join"}, - "b": {"linear": "join"}, + "a": {"linear": "join", "quals": ["single-branch-split"]}, + "b": {"linear": "join", "quals": ["single-branch-split"]}, "join": {"linear": "end", "join": true, "quals": ["join-and"]}, "end": {} } diff --git a/test/core/metaflow_test/__init__.py b/test/core/metaflow_test/__init__.py index 0c5012f043..1e12d47f18 100644 --- a/test/core/metaflow_test/__init__.py +++ b/test/core/metaflow_test/__init__.py @@ -2,6 +2,8 @@ import os from metaflow.exception import MetaflowException from metaflow import current +from metaflow.cards import get_cards +from metaflow.plugins.cards.exception import CardNotPresentException def steps(prio, quals, required=False): @@ -31,6 +33,39 @@ def truncate(var): return var +def retry_untill_timeout(cb_fn, *args, timeout=4, **kwargs): + """ + certain operations in metaflow may not be synchronous and may be running fully asynchronously. + This creates a problem in writing tests that verify some behaviour at runtime. This function + is a helper that allows us to wait for a certain amount of time for a callback function to + return a non-False value. + """ + import time + + start = time.time() + while True: + cb_val = cb_fn(*args, **kwargs) + if cb_val is not False: + return cb_val + if time.time() - start > timeout: + raise TimeoutError("Timeout waiting for callback to return non-False value") + time.sleep(1) + + +def try_to_get_card(id=None, timeout=4): + """ + Safetly try to get the card object until a timeout value. + """ + + def _get_card(card_id): + container = get_card_container(id=card_id) + if container is None: + return False + return container[0] + + return retry_untill_timeout(_get_card, id, timeout=timeout) + + class AssertArtifactFailed(Exception): pass @@ -66,6 +101,16 @@ def __init__(self): super(TestRetry, self).__init__("This is not an error. " "Testing retry...") +def get_card_container(id=None): + """ + Safetly try to load the card_container object. + """ + try: + return get_cards(current.pathspec, id=id) + except CardNotPresentException: + return None + + def is_resumed(): return current.origin_run_id is not None @@ -144,6 +189,12 @@ def assert_log(self, step, logtype, value, exact_match=True): def get_card(self, step, task, card_type): raise NotImplementedError() + def get_card_data(self, step, task, card_type, card_id=None): + """ + returns : (card_present, card_data) + """ + raise NotImplementedError() + def list_cards(self, step, task, card_type=None): raise NotImplementedError() diff --git a/test/core/metaflow_test/formatter.py b/test/core/metaflow_test/formatter.py index a68d134092..bbdd9312ed 100644 --- a/test/core/metaflow_test/formatter.py +++ b/test/core/metaflow_test/formatter.py @@ -85,7 +85,7 @@ def _flow_lines(self): yield 0, "# -*- coding: utf-8 -*-" yield 0, "from metaflow import FlowSpec, step, Parameter, project, IncludeFile, JSONType, current, parallel" - yield 0, "from metaflow_test import assert_equals, assert_equals_metadata, assert_exception, ExpectationFailed, is_resumed, ResumeFromHere, TestRetry" + yield 0, "from metaflow_test import assert_equals, assert_equals_metadata, assert_exception, ExpectationFailed, is_resumed, ResumeFromHere, TestRetry, try_to_get_card" if tags: yield 0, "from metaflow import %s" % ",".join(tags) diff --git a/test/core/metaflow_test/metadata_check.py b/test/core/metaflow_test/metadata_check.py index 543c6cee76..7767b8f107 100644 --- a/test/core/metaflow_test/metadata_check.py +++ b/test/core/metaflow_test/metadata_check.py @@ -167,6 +167,24 @@ def assert_card( ) return True + def get_card_data(self, step, task, card_type, card_id=None): + """ + returns : (card_present, card_data) + """ + from metaflow.plugins.cards.exception import CardNotPresentException + + try: + card_iter = self.get_card(step, task, card_type, card_id=card_id) + except CardNotPresentException: + return False, None + if card_id is None: + # Return the first piece of card_data we can find. + return True, card_iter[0]._get_data() + for card in card_iter: + if card.id == card_id: + return True, card._get_data() + return False, None + def get_log(self, step, logtype): return "".join(getattr(task, logtype) for task in self.run[step]) diff --git a/test/core/tests/card_component_refresh_test.py b/test/core/tests/card_component_refresh_test.py new file mode 100644 index 0000000000..7a2f30282f --- /dev/null +++ b/test/core/tests/card_component_refresh_test.py @@ -0,0 +1,158 @@ +from metaflow_test import MetaflowTest, ExpectationFailed, steps, tag + + +class CardComponentRefreshTest(MetaflowTest): + """ + This test will validates the card component API based for runtime updates. + """ + + PRIORITY = 3 + + @tag('environment(vars={"METAFLOW_CARD_NO_WARNING": "True"})') + @tag('card(type="test_component_refresh_card", id="refresh_card", save_errors=False)') # fmt: skip + @steps( + 0, + [ + "singleton-start", + "sigleton-end", + "singleton", + "foreach-split-small", + "foreach-inner-small", + "foreach-join-small", + "split-and", + "single-branch-split", + "join-and", + "parallel-step", + ], + ) + def step_start(self): + import random + import string + + def _create_random_strings(char_len): + return "".join(random.choice(string.ascii_letters) for i in range(char_len)) + + def _array_is_a_subset(arr1, arr2): + return set(arr1).issubset(set(arr2)) + + def create_random_string_array(size=10): + return [_create_random_strings(10) for i in range(size)] + + from metaflow import current + from metaflow.plugins.cards.card_client import Card + from metaflow.plugins.cards.card_modules.test_cards import ( + _component_values_to_hash, + TestJSONComponent, + ) + import random + import time + + possible_reload_tokens = [] + make_reload_token = lambda a1, a2: "runtime-%s" % _component_values_to_hash( + {"random_key_1": {"abc": a1}, "random_key_2": {"abc": a2}} + ) + component_1_arr = create_random_string_array(5) + # Calling the first refresh should trigger a render of the card. + current.card.append( + TestJSONComponent({"abc": component_1_arr}), id="component_1" + ) + component_2_arr = create_random_string_array(5) + inscope_component = TestJSONComponent({"abc": component_2_arr}) + current.card.append(inscope_component) + current.card.refresh() + # sleep for a little bit because the card refresh is async. + # This feels a little hacky but need better ideas on how to test this + # when async processes may write cards/data in a "best-effort" manner. + # The `try_to_get_card` function will keep retrying to get a card until a + # timeout value is reached. After which the function will throw a `TimeoutError`. + _reload_tok = make_reload_token(component_1_arr, component_2_arr) + card = try_to_get_card(id="refresh_card", timeout=10) + assert_equals(isinstance(card, Card), True) + + sleep_between_refreshes = 2 # Set based on the RUNTIME_CARD_MIN_REFRESH_INTERVAL which acts as a rate-limit to what is refreshed. + + card_html = card.get() + possible_reload_tokens.append(_reload_tok) + # The reload token for card type `test_component_refresh_card` contains a hash of the component values. + # The first assertion will check if this reload token exists is set to what we expect in the HTML page. + assert_equals(_reload_tok in card_html, True) + + card_data = None + for i in range(5): + # We need to test the following : + # 1. We can call `inscope_component.update()` with new data and it will be reflected in the card. + # 2. `current.card.components["component1"].update()` and it will be reflected in the card. + # How do we test it : + # 1. Add new values to the components that have been created. + # 2. Since the card is calculating the reload token based on the hash of the value, we verify that dataupdates have the same reload token or any of the possible reload tokens. + # 3. We also verify that the card_data contains the `data` key that has the lastest information updated for `component_1` + component_2_arr.append(_create_random_strings(10)) + component_1_arr.append(_create_random_strings(10)) + + inscope_component.update({"abc": component_2_arr}) + current.card.components["component_1"].update({"abc": component_1_arr}) + _reload_tok = make_reload_token(component_1_arr, component_2_arr) + current.card.refresh() + + possible_reload_tokens.append(_reload_tok) + card_data = card._get_data() + if card_data is not None: + assert_equals(card_data["reload_token"] in possible_reload_tokens, True) + assert_equals( + _array_is_a_subset( + card_data["data"]["component_1"]["abc"], component_1_arr + ), + True, + ) + time.sleep(sleep_between_refreshes) + + assert_equals(card_data is not None, True) + self.final_data = component_1_arr + # setting step name here helps us figure out what steps should be validated by the checker + self.step_name = current.step_name + + @steps(1, ["all"]) + def step_all(self): + pass + + def check_results(self, flow, checker): + def _array_is_a_subset(arr1, arr2): + return set(arr1).issubset(set(arr2)) + + if checker.__class__.__name__ != "MetadataCheck": + return + run = checker.get_run() + for step in flow: + meta_check_dict = checker.artifact_dict_if_exists(step.name, "final_data") + # Which ever steps ran the actual card testing code + # contains the `final_data` attribute and the `step_name` attribute. + # If these exist then we can succesfully validate the card data since it is meant to exist. + step_done_check_dict = checker.artifact_dict_if_exists( + step.name, "step_name" + ) + for task_id in step_done_check_dict: + if ( + len(step_done_check_dict[task_id]) == 0 + or step_done_check_dict[task_id]["step_name"] != step.name + ): + print( + "Skipping task pathspec %s" % run[step.name][task_id].pathspec + ) + continue + # If the `step_name` attribute was set then surely `final_data` will also be set; + data_obj = meta_check_dict[task_id]["final_data"] + card_present, card_data = checker.get_card_data( + step.name, + task_id, + "test_component_refresh_card", + card_id="refresh_card", + ) + assert_equals(card_present, True) + data_has_latest_artifact = _array_is_a_subset( + data_obj, card_data["data"]["component_1"]["abc"] + ) + assert_equals(data_has_latest_artifact, True) + print( + "Succesfully validated task pathspec %s" + % run[step.name][task_id].pathspec + ) diff --git a/test/core/tests/card_refresh_test.py b/test/core/tests/card_refresh_test.py new file mode 100644 index 0000000000..28f79a82e1 --- /dev/null +++ b/test/core/tests/card_refresh_test.py @@ -0,0 +1,155 @@ +from metaflow_test import MetaflowTest, ExpectationFailed, steps, tag + + +class CardWithRefreshTest(MetaflowTest): + """ + This test Does few checks that the core user interfaces are working : + 1. It validates we can call `current.card.refresh` without any errors. + 2. It validates if the data updates that are getting shipped are correct. + + How will it do it : + + 1. In step code: + 1. We create a random array of strings. + 2. We call `current.card.refresh` with the array. + 3. We check if the card is present given that we have called referesh and the card + should have reached the backend in some short period of time + 4. Keep adding new data to the array and keep calling refresh. + 5. The data-update that got shipped should *atleast* be a subset the actual data present in the runtime code. + 2. In check_results: + 1. We check if the data that got shipped can be access post task completion + 2. We check if the data that got shipped is a subset of the actual data created during the runtime code. + """ + + PRIORITY = 3 + + @tag('environment(vars={"METAFLOW_CARD_NO_WARNING": "True"})') + @tag('card(type="test_refresh_card", id="refresh_card")') + @steps( + 0, + [ + "singleton-start", + "sigleton-end", + "singleton", + "foreach-split-small", + "foreach-inner-small", + "foreach-join-small", + "split-and", + "single-branch-split", + "join-and", + "parallel-step", + ], + ) + def step_start(self): + import random + import string + + def _create_random_strings(char_len): + return "".join(random.choice(string.ascii_letters) for i in range(char_len)) + + def _array_is_a_subset(arr1, arr2): + return set(arr1).issubset(set(arr2)) + + from metaflow import current + from metaflow.cards import get_cards + from metaflow.plugins.cards.card_client import Card + import random + import time + + start_arr = [_create_random_strings(10) for i in range(5)] + # Calling the first refresh should trigger a render of the card. + current.card.refresh({"arr": start_arr}) + # sleep for a little bit because the card refresh is async. + # This feels a little hacky but need better ideas on how to test this + # when async processes may write cards/data in a "best-effort" manner. + # The `try_to_get_card` function will keep retrying to get a card until a + # timeout value is reached. After which the function will throw a `TimeoutError`. + card = try_to_get_card(id="refresh_card", timeout=4) + assert_equals(isinstance(card, Card), True) + + sleep_between_refreshes = 4 # Set based on the RUNTIME_CARD_MIN_REFRESH_INTERVAL which acts as a rate-limit to what is refreshed. + + # Now we check if the refresh interface is working as expected from data updates. + card_data = None + for i in range(5): + # We need to put sleep statements because: + # 1. the update to cards is run via async processes + # 2. card refreshes are rate-limited by RUNTIME_CARD_MIN_REFRESH_INTERVAL so we can't validate with each update. + # There by there is no consistent way to know from during user-code when a data update + # actually got shiped. + start_arr.append(_create_random_strings(10)) + current.card.refresh({"arr": start_arr}) + # We call the `card._get_data` interface to validate the data is available in the card. + # This is a private interface and should not be used by users but is used by internal services. + card_data = card._get_data() + if card_data is not None: + # Assert that data is atleast subset of what we sent to the datastore. + assert_equals( + _array_is_a_subset(card_data["data"]["user"]["arr"], start_arr), + True, + ) + # The `TestRefreshCard.refresh(task, data)` method returns the `data` object as a pass through. + # This test will also serve a purpose of ensuring that any changes to these keys are + # caught by the test framework. The minimum subset should be present and grown as + # need requires. + # We first check the keys created by the refresh-JSON created in the `card_cli.py` + top_level_keys = set(["data", "reload_token"]) + assert_equals(top_level_keys.issubset(set(card_data.keys())), True) + # We then check the keys returned from the `current.card._get_latest_data` which is the + # `data` parameter in the `MetaflowCard.refresh ` method. + required_data_keys = set( + ["mode", "component_update_ts", "components", "render_seq", "user"] + ) + assert_equals( + required_data_keys.issubset(set(card_data["data"].keys())), True + ) + + time.sleep(sleep_between_refreshes) + + assert_equals(card_data is not None, True) + self.final_data = {"arr": start_arr} + # setting step name here helps us figure out what steps should be validated by the checker + self.step_name = current.step_name + + @steps(1, ["all"]) + def step_all(self): + pass + + def check_results(self, flow, checker): + def _array_is_a_subset(arr1, arr2): + return set(arr1).issubset(set(arr2)) + + if checker.__class__.__name__ != "MetadataCheck": + return + run = checker.get_run() + for step in flow: + meta_check_dict = checker.artifact_dict_if_exists(step.name, "final_data") + # Which ever steps ran the actual card testing code + # contains the `final_data` attribute and the `step_name` attribute. + # If these exist then we can succesfully validate the card data since it is meant to exist. + step_done_check_dict = checker.artifact_dict_if_exists( + step.name, "step_name" + ) + for task_id in step_done_check_dict: + if ( + len(step_done_check_dict[task_id]) == 0 + or step_done_check_dict[task_id]["step_name"] != step.name + ): + print( + "Skipping task pathspec %s" % run[step.name][task_id].pathspec + ) + continue + # If the `step_name` attribute was set then surely `final_data` will also be set; + data_obj = meta_check_dict[task_id]["final_data"] + card_present, card_data = checker.get_card_data( + step.name, task_id, "test_refresh_card", card_id="refresh_card" + ) + assert_equals(card_present, True) + data_has_latest_artifact = _array_is_a_subset( + data_obj["arr"], card_data["data"]["user"]["arr"] + ) + assert_equals(data_has_latest_artifact, True) + print( + "Succesfully validated task pathspec %s" + % run[step.name][task_id].pathspec + )