Skip to content
This repository has been archived by the owner on Jul 15, 2024. It is now read-only.

Adds PlanetaryComputerTaskRunner #12

Merged
merged 3 commits into from
Aug 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions docs/flows.md

This file was deleted.

6 changes: 0 additions & 6 deletions docs/tasks.md

This file was deleted.

2 changes: 0 additions & 2 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,5 @@ nav:
- Examples Catalog: examples_catalog.md
- API Reference:
- Credentials: credentials.md
- Tasks: tasks.md
- Flows: flows.md


15 changes: 11 additions & 4 deletions prefect_planetary_computer/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ class PlanetaryComputerCredentials(Block):
[JupyterHub API token](https://planetarycomputer.microsoft.com/docs/concepts/computing/#request-a-token-from-jupyterhub)
to instantiate clusters through Dask Gateway.

Args:
subscription_key: A subscription key to access the full PC data catalog.
hub_api_token: The JupyterHub API token to instantiate clusters through Dask Gateway.

Example:
Load stored Planetary Computer credentials:
```python
Expand All @@ -51,11 +55,13 @@ class PlanetaryComputerCredentials(Block):

def get_stac_catalog(self, **pystac_kwargs) -> pystac_client.Client:
"""
Provides a PySTAC client for the PC data catalog, automatically signing items as they are retrieved.
Provides a [PySTAC client](https://pystac-client.readthedocs.io/en/stable/api.html#client) for the PC data catalog,
automatically signing items as they are retrieved.

For more information about PC signing, refer to the [docs](https://planetarycomputer.microsoft.com/docs/concepts/sas).

Args:
pystac_kwargs: Additional keyword arguments to pass to the [pystac_client.Client.open](https://pystac-client.readthedocs.io/en/stable/api.html#pystac_client.Client.open) method.
pystac_kwargs: Additional keyword arguments to pass to the [`pystac_client.Client.open`](https://pystac-client.readthedocs.io/en/stable/api.html#pystac_client.Client.open) method.

Returns:
A PySTAC client for the PC Catalog.
Expand Down Expand Up @@ -160,8 +166,9 @@ def new_dask_gateway_cluster(
Args:
worker_cores: Number of cores per worker, in the 0.1-8 range. Defaults to 1.
worker_memory: Amount of memory per worker (in GiB) in the 1-64 range. Defaults to 8.
image: The Docker image to be used for the workers. Defaults to [pangeo/pangeo-notebook:latest](https://hub.docker.com/layers/pangeo/pangeo-notebook/latest/images/sha256-94e97e24adf14e72c01f18c782b8c4e0efb1e05950a5f2d2e86e67adcbf547f8)
To use the PC official images, refer to the [planetary-computer-containers](https://github.com/Microsoft/planetary-computer-containers) repo.
image: The Docker image to be used for the workers.
Defaults to [`pangeo/pangeo-notebook:latest`](https://hub.docker.com/layers/pangeo/pangeo-notebook/latest/images/sha256-94e97e24adf14e72c01f18c782b8c4e0efb1e05950a5f2d2e86e67adcbf547f8)
To use the PC official images, refer to the [`planetary-computer-containers`](https://github.com/Microsoft/planetary-computer-containers) repo.
gpu: Whether to use GPU workers. Defaults to False.
environment: Environment variables to set on the workers. Defaults to the GDAL and PYGEOS-related variables set in the PC Hub.
gateway_cluster_kwargs: Additional keyword arguments to pass to [`dask_gateway.GatewayCluster`](https://gateway.dask.org/api-client.html#dask_gateway.GatewayCluster) constructor.
Expand Down
21 changes: 0 additions & 21 deletions prefect_planetary_computer/flows.py

This file was deleted.

117 changes: 117 additions & 0 deletions prefect_planetary_computer/task_runners.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
"""
Interface and implementations of the Planetary Computer Task Runner,
inheriting from [`prefect_dask.DaskTaskRunner`](https://prefecthq.github.io/prefect-dask/task_runners/#prefect_dask.task_runners.DaskTaskRunner).
""" # noqa E501

from dask_gateway.auth import JupyterHubAuth
from prefect_dask import DaskTaskRunner

from prefect_planetary_computer.credentials import PlanetaryComputerCredentials

GATEWAY_ADDRESS = "https://pccompute.westeurope.cloudapp.azure.com/compute/services/dask-gateway" # noqa E501
GATEWAY_PROXY_ADDRESS = "gateway://pccompute-dask.westeurope.cloudapp.azure.com:80"


class PlanetaryComputerTaskRunner(DaskTaskRunner):
"""
A parallel task runner based on
[`prefect_dask.DaskTaskRunner`](https://prefecthq.github.io/prefect-dask/task_runners/#prefect_dask.task_runners.DaskTaskRunner),
providing PC-specific configuration.

It uses [`dask_gateway.GatewayCluster`](https://gateway.dask.org/api-client.html#gatewaycluster)
to create a Dask cluster on the Planetary Computer, enabling submission of both Prefect
and Dask Collections tasks to the cluster.

!!! warning "Multiprocessing safety"
Note that, because the `PlanetaryComputerTaskRunner` uses multiprocessing, calls to flows
in scripts must be guarded with `if __name__ == "__main__":` or warnings will
be displayed.

Args:
cluster_kwargs: Additional kwargs to pass to
[`dask_gateway.GatewayCluster`](https://gateway.dask.org/api-client.html#gatewaycluster)
when creating a temporary dask cluster.
adapt_kwargs: Additional kwargs to pass to
[`dask_gateway.Gateway,adapt_cluster`](https://gateway.dask.org/api-client.html#dask_gateway.Gateway.adapt_cluster)
when creating a temporary cluster.
Note that adaptive scaling is only enabled if `adapt_kwargs` are provided.
client_kwargs (dict, optional): Additional kwargs to use when creating a
[`dask.distributed.Client`](https://distributed.dask.org/en/latest/api.html#client).

Examples:
Using a temporary PC Dask Gateway cluster:
```python
from prefect import flow
from prefect_planetary_computer.task_runners import PlanetaryComputerTaskRunner

pc_credentials = PlanetaryComputerCredentials.load("BLOCK_NAME")

pc_runner = PlanetaryComputerTaskRunner(
credentials=pc_credentials
)

@flow(task_runner=pc_runner)
def my_flow():
...
```

Providing additional kwargs to the PC Dask Gateway cluster:
```python
PlanetaryComputerTaskRunner(
credentials=PlanetaryComputerCredentials.load("BLOCK_NAME"),
cluster_kwargs={
"image": "mcr.microsoft.com/planetary-computer/python:latest",
},
adapt_kwargs={'minimum': 1, 'maximum': 10, 'active': True}
)
```

Connecting to an existing PC dask cluster (use the base `DaskTaskRunner` for this):
```python
DaskTaskRunner(address="gateway://pccompute-dask.westeurope.cloudapp.azure.com:80/prod.93548eccd90042078bbf124f932acd6d")
```

""" # noqa: E501

def __init__(
self,
credentials: PlanetaryComputerCredentials,
cluster_kwargs: dict = None,
adapt_kwargs: dict = None,
client_kwargs: dict = None,
):
self.credentials = credentials
self.hub_api_token = credentials.hub_api_token.get_secret_value()

if self.hub_api_token is None:
raise ValueError("JupyterHub API Token hasn't been provided.")

if cluster_kwargs is None:
cluster_kwargs = {}

cluster_kwargs.update(
{
"address": GATEWAY_ADDRESS,
"proxy_address": GATEWAY_PROXY_ADDRESS,
"auth": JupyterHubAuth(api_token=self.hub_api_token),
}
)

super().__init__(
cluster_class="dask_gateway.GatewayCluster",
cluster_kwargs=cluster_kwargs,
adapt_kwargs=adapt_kwargs,
client_kwargs=client_kwargs,
)

def duplicate(self):
"""
Return a new task runner instance with the same options.
Overrides `prefect.task_runners.BaseTaskRunner`.
"""
return type(self)(
credentials=self.credentials,
cluster_kwargs=self.cluster_kwargs,
adapt_kwargs=self.adapt_kwargs,
client_kwargs=self.client_kwargs,
)
24 changes: 0 additions & 24 deletions prefect_planetary_computer/tasks.py

This file was deleted.

3 changes: 2 additions & 1 deletion requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ interrogate
coverage
pillow
requests_mock
importlib_resources
importlib_resources
numpy
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
prefect>=2.0.0
prefect-dask
planetary-computer
dask-gateway
pystac-client
30 changes: 23 additions & 7 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import json
from unittest.mock import MagicMock

import pytest
import requests_mock
from dask_gateway import Gateway, GatewayCluster
from dask.distributed import LocalCluster
from dask_gateway import Gateway
from importlib_resources import files
from prefect.testing.utilities import prefect_test_harness
from prefect.utilities.importtools import from_qualified_name

from prefect_planetary_computer.credentials import (
CATALOG_URL,
PlanetaryComputerCredentials,
)
from prefect_planetary_computer.task_runners import PlanetaryComputerTaskRunner


@pytest.fixture(scope="session", autouse=True)
Expand Down Expand Up @@ -51,18 +53,32 @@ def mock_pc_stac_responses():


@pytest.fixture
async def mock_gateway_cluster(monkeypatch):
def mock_gateway_new_cluster(monkeypatch):
def mock_new_cluster(*args, **kwargs): # noqa
cluster = MagicMock(spec=GatewayCluster)
cluster.name = "test-cluster"
cluster.status = "running"
cluster = LocalCluster(name="test-cluster")
return cluster

monkeypatch.setattr(Gateway, "new_cluster", mock_new_cluster)


@pytest.fixture
def pc_credentials_block(mock_pc_stac_responses, mock_gateway_cluster): # noqa
def mock_pc_credentials_block(mock_pc_stac_responses, mock_gateway_new_cluster): # noqa
return PlanetaryComputerCredentials(
hub_api_token="fake-token", subscription_key="fake-key"
)


@pytest.fixture
def mock_pc_task_runner(mock_pc_credentials_block):
def new_basic_mock_pc_runner():
basic_pc_runner = PlanetaryComputerTaskRunner(mock_pc_credentials_block)
basic_pc_runner.cluster_class = from_qualified_name(
"dask.distributed.LocalCluster"
)
basic_pc_runner.cluster_kwargs = {"name": "test-cluster"}
return basic_pc_runner

pc_runner = new_basic_mock_pc_runner()
pc_runner.duplicate = new_basic_mock_pc_runner

return pc_runner
15 changes: 7 additions & 8 deletions tests/test_credentials.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,27 @@
from dask_gateway import Gateway, GatewayCluster
from dask_gateway import Gateway
from pystac_client import Client

from prefect_planetary_computer.credentials import GATEWAY_ADDRESS


def test_get_dask_gateway(pc_credentials_block):
gateway_client = pc_credentials_block.get_dask_gateway()
def test_get_dask_gateway(mock_pc_credentials_block):
gateway_client = mock_pc_credentials_block.get_dask_gateway()
assert isinstance(gateway_client, Gateway)
assert gateway_client.address == GATEWAY_ADDRESS


def test_new_dask_gateway_cluster(pc_credentials_block):
gateway_cluster = pc_credentials_block.new_dask_gateway_cluster(
def test_new_dask_gateway_cluster(mock_pc_credentials_block):
gateway_cluster = mock_pc_credentials_block.new_dask_gateway_cluster(
worker_cores=1.0,
worker_memory=8.0,
image="pangeo/pangeo-notebook:latest",
gpu=False,
environment={"GDAL_DISABLE_READDIR_ON_OPEN": "EMPTY_DIR"},
)
assert isinstance(gateway_cluster, GatewayCluster)
assert gateway_cluster.name == "test-cluster"


def test_get_stac_catalog(pc_credentials_block):
stac_catalog = pc_credentials_block.get_stac_catalog()
def test_get_stac_catalog(mock_pc_credentials_block):
stac_catalog = mock_pc_credentials_block.get_stac_catalog()
assert isinstance(stac_catalog, Client)
assert stac_catalog.id == "microsoft-pc"
6 changes: 0 additions & 6 deletions tests/test_flows.py

This file was deleted.

23 changes: 23 additions & 0 deletions tests/test_task_runners.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import dask.array
from prefect import flow, task
from prefect_dask import get_async_dask_client


async def test_planetary_computer_task_runner(mock_pc_task_runner):
@task
async def compute_dask_sum():
"""Declares two dask arrays and returns sum"""
a = dask.array.random.normal(size=(1000, 1000), chunks=(100, 100))
b = dask.array.random.normal(size=(1000, 1000), chunks=(100, 100))

async with get_async_dask_client(timeout="120s") as client:
sum = await client.compute(a + b)
return sum

@flow(task_runner=mock_pc_task_runner)
async def dask_flow():
prefect_future = await compute_dask_sum.submit()
return await prefect_future.result()

result = await dask_flow()
assert result.shape == (1000, 1000)
24 changes: 0 additions & 24 deletions tests/test_tasks.py

This file was deleted.