From 0bfedf30667f831bf019718af07a42c45414a33a Mon Sep 17 00:00:00 2001 From: Giorgio Basile Date: Sat, 5 Aug 2023 10:05:04 +0200 Subject: [PATCH 1/3] Remove sample flows and tasks --- docs/flows.md | 6 ------ docs/tasks.md | 6 ------ mkdocs.yml | 2 -- prefect_planetary_computer/flows.py | 21 --------------------- prefect_planetary_computer/tasks.py | 24 ------------------------ tests/test_flows.py | 6 ------ tests/test_tasks.py | 24 ------------------------ 7 files changed, 89 deletions(-) delete mode 100644 docs/flows.md delete mode 100644 docs/tasks.md delete mode 100644 prefect_planetary_computer/flows.py delete mode 100644 prefect_planetary_computer/tasks.py delete mode 100644 tests/test_flows.py delete mode 100644 tests/test_tasks.py diff --git a/docs/flows.md b/docs/flows.md deleted file mode 100644 index 628dd10..0000000 --- a/docs/flows.md +++ /dev/null @@ -1,6 +0,0 @@ ---- -description: -notes: This documentation page is generated from source file docstrings. ---- - -::: prefect_planetary_computer.flows \ No newline at end of file diff --git a/docs/tasks.md b/docs/tasks.md deleted file mode 100644 index 8ad582e..0000000 --- a/docs/tasks.md +++ /dev/null @@ -1,6 +0,0 @@ ---- -description: -notes: This documentation page is generated from source file docstrings. ---- - -::: prefect_planetary_computer.tasks \ No newline at end of file diff --git a/mkdocs.yml b/mkdocs.yml index 18deff6..761d785 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -77,7 +77,5 @@ nav: - Examples Catalog: examples_catalog.md - API Reference: - Credentials: credentials.md - - Tasks: tasks.md - - Flows: flows.md diff --git a/prefect_planetary_computer/flows.py b/prefect_planetary_computer/flows.py deleted file mode 100644 index a3ba17b..0000000 --- a/prefect_planetary_computer/flows.py +++ /dev/null @@ -1,21 +0,0 @@ -"""This is an example flows module""" -from prefect import flow - -from prefect_planetary_computer.tasks import ( - goodbye_prefect_planetary_computer, - hello_prefect_planetary_computer, -) - - -@flow -def hello_and_goodbye(): - """ - Sample flow that says hello and goodbye! - """ - print(hello_prefect_planetary_computer()) - print(goodbye_prefect_planetary_computer()) - return "Done" - - -if __name__ == "__main__": - hello_and_goodbye() diff --git a/prefect_planetary_computer/tasks.py b/prefect_planetary_computer/tasks.py deleted file mode 100644 index 0697194..0000000 --- a/prefect_planetary_computer/tasks.py +++ /dev/null @@ -1,24 +0,0 @@ -"""This is an example tasks module""" -from prefect import task - - -@task -def hello_prefect_planetary_computer() -> str: - """ - Sample task that says hello! - - Returns: - A greeting for your collection - """ - return "Hello, prefect-planetary-computer!" - - -@task -def goodbye_prefect_planetary_computer() -> str: - """ - Sample task that says goodbye! - - Returns: - A farewell for your collection - """ - return "Goodbye, prefect-planetary-computer!" diff --git a/tests/test_flows.py b/tests/test_flows.py deleted file mode 100644 index 2bca7b8..0000000 --- a/tests/test_flows.py +++ /dev/null @@ -1,6 +0,0 @@ -from prefect_planetary_computer.flows import hello_and_goodbye - - -def test_hello_and_goodbye_flow(): - result = hello_and_goodbye() - assert result == "Done" diff --git a/tests/test_tasks.py b/tests/test_tasks.py deleted file mode 100644 index 524e96d..0000000 --- a/tests/test_tasks.py +++ /dev/null @@ -1,24 +0,0 @@ -from prefect import flow - -from prefect_planetary_computer.tasks import ( - goodbye_prefect_planetary_computer, - hello_prefect_planetary_computer, -) - - -def test_hello_prefect_planetary_computer(): - @flow - def test_flow(): - return hello_prefect_planetary_computer() - - result = test_flow() - assert result == "Hello, prefect-planetary-computer!" - - -def goodbye_hello_prefect_planetary_computer(): - @flow - def test_flow(): - return goodbye_prefect_planetary_computer() - - result = test_flow() - assert result == "Goodbye, prefect-planetary-computer!" From 527f91bd8da823da95e11218ff6554c47ec14e56 Mon Sep 17 00:00:00 2001 From: Giorgio Basile Date: Sat, 5 Aug 2023 13:50:57 +0200 Subject: [PATCH 2/3] Improve credentials docstrings --- prefect_planetary_computer/credentials.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/prefect_planetary_computer/credentials.py b/prefect_planetary_computer/credentials.py index adf1116..d78883b 100644 --- a/prefect_planetary_computer/credentials.py +++ b/prefect_planetary_computer/credentials.py @@ -26,6 +26,10 @@ class PlanetaryComputerCredentials(Block): [JupyterHub API token](https://planetarycomputer.microsoft.com/docs/concepts/computing/#request-a-token-from-jupyterhub) to instantiate clusters through Dask Gateway. + Args: + subscription_key: A subscription key to access the full PC data catalog. + hub_api_token: The JupyterHub API token to instantiate clusters through Dask Gateway. + Example: Load stored Planetary Computer credentials: ```python @@ -51,11 +55,13 @@ class PlanetaryComputerCredentials(Block): def get_stac_catalog(self, **pystac_kwargs) -> pystac_client.Client: """ - Provides a PySTAC client for the PC data catalog, automatically signing items as they are retrieved. + Provides a [PySTAC client](https://pystac-client.readthedocs.io/en/stable/api.html#client) for the PC data catalog, + automatically signing items as they are retrieved. + For more information about PC signing, refer to the [docs](https://planetarycomputer.microsoft.com/docs/concepts/sas). Args: - pystac_kwargs: Additional keyword arguments to pass to the [pystac_client.Client.open](https://pystac-client.readthedocs.io/en/stable/api.html#pystac_client.Client.open) method. + pystac_kwargs: Additional keyword arguments to pass to the [`pystac_client.Client.open`](https://pystac-client.readthedocs.io/en/stable/api.html#pystac_client.Client.open) method. Returns: A PySTAC client for the PC Catalog. @@ -160,8 +166,9 @@ def new_dask_gateway_cluster( Args: worker_cores: Number of cores per worker, in the 0.1-8 range. Defaults to 1. worker_memory: Amount of memory per worker (in GiB) in the 1-64 range. Defaults to 8. - image: The Docker image to be used for the workers. Defaults to [pangeo/pangeo-notebook:latest](https://hub.docker.com/layers/pangeo/pangeo-notebook/latest/images/sha256-94e97e24adf14e72c01f18c782b8c4e0efb1e05950a5f2d2e86e67adcbf547f8) - To use the PC official images, refer to the [planetary-computer-containers](https://github.com/Microsoft/planetary-computer-containers) repo. + image: The Docker image to be used for the workers. + Defaults to [`pangeo/pangeo-notebook:latest`](https://hub.docker.com/layers/pangeo/pangeo-notebook/latest/images/sha256-94e97e24adf14e72c01f18c782b8c4e0efb1e05950a5f2d2e86e67adcbf547f8) + To use the PC official images, refer to the [`planetary-computer-containers`](https://github.com/Microsoft/planetary-computer-containers) repo. gpu: Whether to use GPU workers. Defaults to False. environment: Environment variables to set on the workers. Defaults to the GDAL and PYGEOS-related variables set in the PC Hub. gateway_cluster_kwargs: Additional keyword arguments to pass to [`dask_gateway.GatewayCluster`](https://gateway.dask.org/api-client.html#dask_gateway.GatewayCluster) constructor. From e3cfaf4de15e076e251feeb6f6a02d2eff8a1579 Mon Sep 17 00:00:00 2001 From: Giorgio Basile Date: Sat, 5 Aug 2023 13:53:23 +0200 Subject: [PATCH 3/3] Add PC task runner based on dask task runner --- prefect_planetary_computer/task_runners.py | 117 +++++++++++++++++++++ requirements-dev.txt | 3 +- requirements.txt | 1 + tests/conftest.py | 30 ++++-- tests/test_credentials.py | 15 ++- tests/test_task_runners.py | 23 ++++ 6 files changed, 173 insertions(+), 16 deletions(-) create mode 100644 prefect_planetary_computer/task_runners.py create mode 100644 tests/test_task_runners.py diff --git a/prefect_planetary_computer/task_runners.py b/prefect_planetary_computer/task_runners.py new file mode 100644 index 0000000..938c0c3 --- /dev/null +++ b/prefect_planetary_computer/task_runners.py @@ -0,0 +1,117 @@ +""" +Interface and implementations of the Planetary Computer Task Runner, +inheriting from [`prefect_dask.DaskTaskRunner`](https://prefecthq.github.io/prefect-dask/task_runners/#prefect_dask.task_runners.DaskTaskRunner). +""" # noqa E501 + +from dask_gateway.auth import JupyterHubAuth +from prefect_dask import DaskTaskRunner + +from prefect_planetary_computer.credentials import PlanetaryComputerCredentials + +GATEWAY_ADDRESS = "https://pccompute.westeurope.cloudapp.azure.com/compute/services/dask-gateway" # noqa E501 +GATEWAY_PROXY_ADDRESS = "gateway://pccompute-dask.westeurope.cloudapp.azure.com:80" + + +class PlanetaryComputerTaskRunner(DaskTaskRunner): + """ + A parallel task runner based on + [`prefect_dask.DaskTaskRunner`](https://prefecthq.github.io/prefect-dask/task_runners/#prefect_dask.task_runners.DaskTaskRunner), + providing PC-specific configuration. + + It uses [`dask_gateway.GatewayCluster`](https://gateway.dask.org/api-client.html#gatewaycluster) + to create a Dask cluster on the Planetary Computer, enabling submission of both Prefect + and Dask Collections tasks to the cluster. + + !!! warning "Multiprocessing safety" + Note that, because the `PlanetaryComputerTaskRunner` uses multiprocessing, calls to flows + in scripts must be guarded with `if __name__ == "__main__":` or warnings will + be displayed. + + Args: + cluster_kwargs: Additional kwargs to pass to + [`dask_gateway.GatewayCluster`](https://gateway.dask.org/api-client.html#gatewaycluster) + when creating a temporary dask cluster. + adapt_kwargs: Additional kwargs to pass to + [`dask_gateway.Gateway,adapt_cluster`](https://gateway.dask.org/api-client.html#dask_gateway.Gateway.adapt_cluster) + when creating a temporary cluster. + Note that adaptive scaling is only enabled if `adapt_kwargs` are provided. + client_kwargs (dict, optional): Additional kwargs to use when creating a + [`dask.distributed.Client`](https://distributed.dask.org/en/latest/api.html#client). + + Examples: + Using a temporary PC Dask Gateway cluster: + ```python + from prefect import flow + from prefect_planetary_computer.task_runners import PlanetaryComputerTaskRunner + + pc_credentials = PlanetaryComputerCredentials.load("BLOCK_NAME") + + pc_runner = PlanetaryComputerTaskRunner( + credentials=pc_credentials + ) + + @flow(task_runner=pc_runner) + def my_flow(): + ... + ``` + + Providing additional kwargs to the PC Dask Gateway cluster: + ```python + PlanetaryComputerTaskRunner( + credentials=PlanetaryComputerCredentials.load("BLOCK_NAME"), + cluster_kwargs={ + "image": "mcr.microsoft.com/planetary-computer/python:latest", + }, + adapt_kwargs={'minimum': 1, 'maximum': 10, 'active': True} + ) + ``` + + Connecting to an existing PC dask cluster (use the base `DaskTaskRunner` for this): + ```python + DaskTaskRunner(address="gateway://pccompute-dask.westeurope.cloudapp.azure.com:80/prod.93548eccd90042078bbf124f932acd6d") + ``` + + """ # noqa: E501 + + def __init__( + self, + credentials: PlanetaryComputerCredentials, + cluster_kwargs: dict = None, + adapt_kwargs: dict = None, + client_kwargs: dict = None, + ): + self.credentials = credentials + self.hub_api_token = credentials.hub_api_token.get_secret_value() + + if self.hub_api_token is None: + raise ValueError("JupyterHub API Token hasn't been provided.") + + if cluster_kwargs is None: + cluster_kwargs = {} + + cluster_kwargs.update( + { + "address": GATEWAY_ADDRESS, + "proxy_address": GATEWAY_PROXY_ADDRESS, + "auth": JupyterHubAuth(api_token=self.hub_api_token), + } + ) + + super().__init__( + cluster_class="dask_gateway.GatewayCluster", + cluster_kwargs=cluster_kwargs, + adapt_kwargs=adapt_kwargs, + client_kwargs=client_kwargs, + ) + + def duplicate(self): + """ + Return a new task runner instance with the same options. + Overrides `prefect.task_runners.BaseTaskRunner`. + """ + return type(self)( + credentials=self.credentials, + cluster_kwargs=self.cluster_kwargs, + adapt_kwargs=self.adapt_kwargs, + client_kwargs=self.client_kwargs, + ) diff --git a/requirements-dev.txt b/requirements-dev.txt index b540d56..f696d2d 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -14,4 +14,5 @@ interrogate coverage pillow requests_mock -importlib_resources \ No newline at end of file +importlib_resources +numpy \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 4010b04..52e7dcd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ prefect>=2.0.0 +prefect-dask planetary-computer dask-gateway pystac-client diff --git a/tests/conftest.py b/tests/conftest.py index 39749e5..81cb19d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,16 +1,18 @@ import json -from unittest.mock import MagicMock import pytest import requests_mock -from dask_gateway import Gateway, GatewayCluster +from dask.distributed import LocalCluster +from dask_gateway import Gateway from importlib_resources import files from prefect.testing.utilities import prefect_test_harness +from prefect.utilities.importtools import from_qualified_name from prefect_planetary_computer.credentials import ( CATALOG_URL, PlanetaryComputerCredentials, ) +from prefect_planetary_computer.task_runners import PlanetaryComputerTaskRunner @pytest.fixture(scope="session", autouse=True) @@ -51,18 +53,32 @@ def mock_pc_stac_responses(): @pytest.fixture -async def mock_gateway_cluster(monkeypatch): +def mock_gateway_new_cluster(monkeypatch): def mock_new_cluster(*args, **kwargs): # noqa - cluster = MagicMock(spec=GatewayCluster) - cluster.name = "test-cluster" - cluster.status = "running" + cluster = LocalCluster(name="test-cluster") return cluster monkeypatch.setattr(Gateway, "new_cluster", mock_new_cluster) @pytest.fixture -def pc_credentials_block(mock_pc_stac_responses, mock_gateway_cluster): # noqa +def mock_pc_credentials_block(mock_pc_stac_responses, mock_gateway_new_cluster): # noqa return PlanetaryComputerCredentials( hub_api_token="fake-token", subscription_key="fake-key" ) + + +@pytest.fixture +def mock_pc_task_runner(mock_pc_credentials_block): + def new_basic_mock_pc_runner(): + basic_pc_runner = PlanetaryComputerTaskRunner(mock_pc_credentials_block) + basic_pc_runner.cluster_class = from_qualified_name( + "dask.distributed.LocalCluster" + ) + basic_pc_runner.cluster_kwargs = {"name": "test-cluster"} + return basic_pc_runner + + pc_runner = new_basic_mock_pc_runner() + pc_runner.duplicate = new_basic_mock_pc_runner + + return pc_runner diff --git a/tests/test_credentials.py b/tests/test_credentials.py index f1fdb45..4c80e17 100644 --- a/tests/test_credentials.py +++ b/tests/test_credentials.py @@ -1,28 +1,27 @@ -from dask_gateway import Gateway, GatewayCluster +from dask_gateway import Gateway from pystac_client import Client from prefect_planetary_computer.credentials import GATEWAY_ADDRESS -def test_get_dask_gateway(pc_credentials_block): - gateway_client = pc_credentials_block.get_dask_gateway() +def test_get_dask_gateway(mock_pc_credentials_block): + gateway_client = mock_pc_credentials_block.get_dask_gateway() assert isinstance(gateway_client, Gateway) assert gateway_client.address == GATEWAY_ADDRESS -def test_new_dask_gateway_cluster(pc_credentials_block): - gateway_cluster = pc_credentials_block.new_dask_gateway_cluster( +def test_new_dask_gateway_cluster(mock_pc_credentials_block): + gateway_cluster = mock_pc_credentials_block.new_dask_gateway_cluster( worker_cores=1.0, worker_memory=8.0, image="pangeo/pangeo-notebook:latest", gpu=False, environment={"GDAL_DISABLE_READDIR_ON_OPEN": "EMPTY_DIR"}, ) - assert isinstance(gateway_cluster, GatewayCluster) assert gateway_cluster.name == "test-cluster" -def test_get_stac_catalog(pc_credentials_block): - stac_catalog = pc_credentials_block.get_stac_catalog() +def test_get_stac_catalog(mock_pc_credentials_block): + stac_catalog = mock_pc_credentials_block.get_stac_catalog() assert isinstance(stac_catalog, Client) assert stac_catalog.id == "microsoft-pc" diff --git a/tests/test_task_runners.py b/tests/test_task_runners.py new file mode 100644 index 0000000..5a42ed9 --- /dev/null +++ b/tests/test_task_runners.py @@ -0,0 +1,23 @@ +import dask.array +from prefect import flow, task +from prefect_dask import get_async_dask_client + + +async def test_planetary_computer_task_runner(mock_pc_task_runner): + @task + async def compute_dask_sum(): + """Declares two dask arrays and returns sum""" + a = dask.array.random.normal(size=(1000, 1000), chunks=(100, 100)) + b = dask.array.random.normal(size=(1000, 1000), chunks=(100, 100)) + + async with get_async_dask_client(timeout="120s") as client: + sum = await client.compute(a + b) + return sum + + @flow(task_runner=mock_pc_task_runner) + async def dask_flow(): + prefect_future = await compute_dask_sum.submit() + return await prefect_future.result() + + result = await dask_flow() + assert result.shape == (1000, 1000)