Install and run Dask on a Kubernetes cluster in ESA HPC cloud
Dask enables scaling computation tasks either as multiple processes on a single machine, or on Dask clusters that consist of multiple worker machines. Dask provides a scalable alternative to popular Python libraries e.g. Numpy, Pandas or SciKit Learn, but still using a compact and very similar API.
Dask scheduler, once presented with a computation task, splits it into smaller tasks that can be executed in parallel on the worker nodes/processes.
In this article you will install a Dask cluster on Kubernetes and run Dask worker nodes as Kubernetes pods. As part of the installation, you will get access to a Jupyter instance, where you can run the sample code.
What We Are Going To Cover
Install Dask on Kubernetes
Access Jupyter and Dask Scheduler dashboard
Run a sample computing task
Configure Dask cluster on Kubernetes from Python
Resolving errors
Prerequisites
No. 1 Hosting
You need a ESA HPC hosting account with Horizon interface https://horizon.eohpc.net/auth/login/?next=/.
No. 2 Kubernetes cluster on CloudFerro cloud
To create Kubernetes cluster on EOHPC cloud refer to this guide: How to Create a Kubernetes Cluster Using ESA HPC OpenStack Magnum
No. 3 Access to kubectl command line
The instructions for activation of kubectl are provided in: How To Access Kubernetes Cluster Post Deployment Using Kubectl On ESA HPC OpenStack Magnum
No. 4 Familiarity with Helm
For more information on using Helm and installing apps with Helm on Kubernetes, refer to Deploying Helm Charts on Magnum Kubernetes Clusters on ESA HPC EOHPC Cloud
No. 5 Python3 available on your machine
Python3 preinstalled on the working machine.
No. 6 Basic familiarity with Jupyter and Python scientific libraries
We will use Pandas as an example.
Step 1 Install Dask on Kubernetes
To install Dask as a Helm chart, first download the Dask Helm repository:
helm repo add dask https://helm.dask.org/
Instead of installing the chart out of the box, let us customize the configuration for convenience. To view all possible configurations and their defaults run:
helm show dask/dask
Prepare file dask-values.yaml to override some of the defaults:
dask-values.yaml
scheduler:
serviceType: LoadBalancer
jupyter:
serviceType: LoadBalancer
worker:
replicas: 4
This changes the default service type for Jupyter and Scheduler to LoadBalancer, so that they get exposed publicly. Also, the default number of Dask workers is 3 but is now changed to 4. Each Dask worker pod will get allocated 3GB RAM and 1CPU, we keep it at this default.
To deploy the chart, create the namespace dask and install to it:
helm install dask dask/dask -n dask --create-namespace -f dask-values.yaml
Step 2 Access Jupyter and Dask Scheduler dashboard
After the installation step, you can access Dask services:
kubectl get services -n dask
There are two services, for Jupyter and Dask Scheduler dashboard. Populating external IPs will take few minutes:
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
dask-jupyter LoadBalancer 10.254.230.230 64.225.128.91 80:32437/TCP 6m49s
dask-scheduler LoadBalancer 10.254.41.250 64.225.128.236 8786:31707/TCP,80:31668/TCP 6m49s
We can paste the external IPs to the browser to view the services. To access Jupyter, you will first need to pass the login screen, the default password is dask. Then you can view the Jupyter instance:
Similarly, with the Scheduler Dashboard, paste the floating IP to the browser to view it. If you then click on the “Workers” tab above, you can see that 4 workers are running on our Dask cluster:
Step 3 Run a sample computing task
The installed Jupyter instance already contains Dask and other useful Python libraries installed. To run a sample job, first activate the notebook by clicking on icon named NoteBook → Python3(ipykernel) on the right hand side of the Jupyter instance browser screen.
The sample job performs calculation on table (dataframe) of 100k rows, and just one column. Each record will be filled with a random integer from 1 to 100,000 and the task is to calculate the sum of all records.
The code will run the same example for Pandas (single process) and Dask (parallelized on our cluster) and we will be able to inspect the results.
Copy the following code and paste to the cell in Jupyter notebook:
import dask.dataframe as dd
import pandas as pd
import numpy as np
import time
data = {'A': np.random.randint(1, 100_000_000, 100_000_000)}
df_pandas = pd.DataFrame(data)
df_dask = dd.from_pandas(df_pandas, npartitions=4)
# Pandas
start_time_pandas = time.time()
result_pandas = df_pandas['A'].sum()
end_time_pandas = time.time()
print(f"Result Pandas: {result_pandas}")
print(f"Computation time Pandas: {end_time_pandas - start_time_pandas:.2f} seconds.")
# Dask
start_time_dask = time.time()
result_dask = df_dask['A'].sum().compute()
end_time_dask = time.time()
print(f"Result Dask: {result_dask}")
print(f"Computation time Dask: {end_time_dask - start_time_dask:.2f} seconds.")
Hit play or use option Run from the main menu to execute the code. After a few seconds, the result will appear below the cell with code.
Some of the results we could observe for this example:
Result Pandas: 4999822570722943
Computation time Pandas: 0.15 seconds.
Result Dask: 4999822570722943
Computation time Dask: 0.07 seconds.
Note these results are not deterministic and simple Pandas could also perform better case by case. The overhead to distribute and collect results from Dask workers needs to be also taken into account. Further tuning the performance of Dask is beyond the scope of this article.
Step 4 Configure Dask cluster on Kubernetes from Python
For managing the Dask cluster on Kubernetes we can use a dedicated Python library dask-kubernetes. Using this library, we can reconfigure certain parameters of our Dask cluster.
One way to run dask-kubernetes would be from the Jupyter instance but then we would have to provide reference to kubeconfig of our cluster. Instead, we install dask-kubernetes in our local environment, with the following command:
pip install dask-kubernetes
Once this is done, we can manage the Dask cluster from Python. As an example, let us upscale it to 5 Dask nodes. Use nano to create file scale-cluster.py:
nano scale-cluster.py
then insert the following commands:
scale-cluster.py
from dask_kubernetes import HelmCluster
cluster = HelmCluster(release_name="dask", namespace="dask")
cluster.scale(5)
Apply with:
python3 scale-cluster.py
Using the command
kubectl get pods -n dask
you can see that the number of workers now is 5:
Or, you can see the current number of worker nodes in the Dask Scheduler dashboard (refresh the screen):
Note that the functionalities of dask-kubernetes should be possible to achieve using just Kubernetes API directly, the choice will depend on your personal preference.
Resolving errors
When running command
python3 scale-cluster.py
on WSL version 1, error messages such as these may appear:
The code will work properly, that is, it will increase the number of workers to 5, as required. The error should not appear on WSL version 2 and other Ubuntu distros.