Source code for dea_tools.dask
## dea_dask.py
'''
Tools for simplifying the creation of Dask clusters for parallelised computing.
License: The code in this notebook is licensed under the Apache License,
Version 2.0 (https://www.apache.org/licenses/LICENSE-2.0). Digital Earth
Australia data is licensed under the Creative Commons by Attribution 4.0
license (https://creativecommons.org/licenses/by/4.0/).
Contact: If you need assistance, please post a question on the Open Data
Cube Discord chat (https://discord.com/invite/4hhBQVas5U) or on the GIS Stack
Exchange (https://gis.stackexchange.com/questions/ask?tags=open-data-cube)
using the `open-data-cube` tag (you can view previously asked questions
here: https://gis.stackexchange.com/questions/tagged/open-data-cube).
If you would like to report an issue with this script, you can file one on
GitHub (https://github.com/GeoscienceAustralia/dea-notebooks/issues/new).
Last modified: June 2022
'''
from importlib.util import find_spec
import os
import dask
from aiohttp import ClientConnectionError
from datacube.utils.dask import start_local_dask
from datacube.utils.rio import configure_s3_access
_HAVE_PROXY = bool(find_spec('jupyter_server_proxy'))
_IS_AWS = ('AWS_ACCESS_KEY_ID' in os.environ or
'AWS_DEFAULT_REGION' in os.environ)
[docs]
def create_local_dask_cluster(spare_mem='3Gb', display_client=True, return_client=False):
"""
Using the datacube utils function `start_local_dask`, generate
a local dask cluster. Automatically detects if on AWS or NCI.
Example use :
import sys
sys.path.append("../Scripts")
from dea_dask import create_local_dask_cluster
create_local_dask_cluster(spare_mem='4Gb')
Parameters
----------
spare_mem : String, optional
The amount of memory, in Gb, to leave for the notebook to run.
This memory will not be used by the cluster. e.g '3Gb'
display_client : Bool, optional
An optional boolean indicating whether to display a summary of
the dask client, including a link to monitor progress of the
analysis. Set to False to hide this display.
return_client : Bool, optional
An optional boolean indicating whether to return the dask client
object.
"""
if _HAVE_PROXY:
# Configure dashboard link to go over proxy
prefix = os.environ.get('JUPYTERHUB_SERVICE_PREFIX', '/')
dask.config.set({"distributed.dashboard.link":
prefix + "proxy/{port}/status"})
# Start up a local cluster
client = start_local_dask(mem_safety_margin=spare_mem)
if _IS_AWS:
# Configure GDAL for s3 access
configure_s3_access(aws_unsigned=True,
client=client)
# Show the dask cluster settings
if display_client:
from IPython.display import display
display(client)
# return the client as an object
if return_client:
return client
try:
from dask_gateway import Gateway
def create_dask_gateway_cluster(profile='r5_L', workers=2):
"""
Create a cluster in our internal dask cluster.
Parameters
----------
profile : str
Possible values are:
- r5_L (2 cores, 15GB memory)
- r5_XL (4 cores, 31GB memory)
- r5_2XL (8 cores, 63GB memory)
- r5_4XL (16 cores, 127GB memory)
workers : int
Number of workers in the cluster.
"""
try:
gateway = Gateway()
# Close any existing clusters
cluster_names = gateway.list_clusters()
if len(cluster_names) > 0:
print("Cluster(s) still running:", cluster_names)
for n in cluster_names:
cluster = gateway.connect(n.name)
cluster.shutdown()
options = gateway.cluster_options()
options['profile'] = profile
# limit username to alphanumeric characters
# kubernetes pods won't launch if labels contain anything other than [a-Z, -, _]
options['jupyterhub_user'] = ''.join(c if c.isalnum() else '-' for c in os.getenv('JUPYTERHUB_USER'))
cluster = gateway.new_cluster(options)
cluster.scale(workers)
return cluster
except ClientConnectionError:
raise ConnectionError("access to dask gateway cluster unauthorized")
except ImportError:
[docs]
def create_dask_gateway_cluster(*args, **kwargs):
raise NotImplementedError