Scaling up scientific computing and analysis using Dask

Pierre Glaser, Gatsby Computational Neuroscience Unit

Who am I?

  • A Gatsby first-year PhD student working on generative models and kernel methods
  • Previous job: improving the performance of scientific computing in Python in a distributed setting
  • Exposition to computational neuroscience: part of the Parietal INRIA team, that constructs numerical methods for neuroimaging data analysis.

What is this talk about:

dask, a dynamic task scheduler written in Python, which aims at porting both:

  • the Python scientific ecosystem
  • programming workflows (data analysis, running experiments)


to out-of-core workloads and cluster of resources

Goals and Non Goals

  • NON GOAL: convince you to use tool X/language Y/workflow Z.
  • Actual Goal: helping field experts (like you) scale up their experiments through an in-depth description of one successful distributed computing library.
Let's get into it!

Distributed Computing: Python's next challenge?

Python has taken over a large part of scientific computing. Famous scientific success stories:

  • The Mars helicopter mission
  • The M87 black hole imaging
  • Powering most of the AI ecosystem

Distributed Computing: Python's next challenge?

The key reasons for Python's success in science are:
  • Extreme interactivity, combined with
  • Bare metal performance, through it's C-extensibility
  • Python is also extremely polyvalent (example: wide set of networking primitives)

Computing in the 21st Century

  • Amount of data is drastically increasing, often above RAM limits
  • Computing resources are expanding their capacity, often through computing clusters
  • On the other hand, traditional scientific computing libraries in python do not handle either out-of-core data, or distributed computing resources.

Some classic python code first...


In [1]: import pandas as pd
   ...: df = pd.read_csv('neuron_activities.csv')
   ...: firing_rates = df['firing_rate'].dropna()
   ...: print(firing_rates.mean())
					

In [1]: import pandas as pd
   ...: df = pd.read_csv('neuron_activities.csv')
   ...: firing_rates = df['firing_rate'].dropna()
   ...: print(firing_rates.mean())
					

In [1]: import pandas as pd
   ...: df = pd.read_csv('neuron_activities.csv')
   ...: firing_rates = df['firing_rate'].dropna()
   ...: print(firing_rates.mean())
					
What happens if your dataframe cannot fit in memory?
  • Dask breaks down "large computations" into a set of smaller ones, arranging them into a task graph.
  • Dask never evaluates this task graph unless explictly asked to ("lazy evaluation")
A dask module example: dask.array

In [1]: import dask.array as da
   ...: a = da.random.random((4000, 1000))  # lazy!
					

In [1]: import dask.array as da
   ...: a = da.random.random((4000, 1000))  # lazy!
   ...: m = a.mean()  # lazy!
					

In [1]: import dask.array as da
   ...: a = da.random.random((4000, 1000))  # lazy!
   ...: m = a.mean()  # lazy!
   ...: print(m.compute())  # make dask execute the task graph
					

Each node in this graph represents a unitary python function call with a RAM-friendly memory footprint

Dask provides two complementary features:


In [1]: import dask.array as da
   ...: a = da.random.random((4000, 1000))  # lazy!
   ...: m = a.mean()  # lazy!
					

In [1]: import dask.array as da
   ...: a = da.random.random((4000, 1000))  # lazy!
   ...: m = a.mean()  # lazy!
   ...: print(m.compute())  # make dask execute the task graph
					
  • The generation of task graphs for a set of supported operations (array.mean(), array.std(), array.matmul())

  • The execution task graphs coordinated by the dask scheduler and executed by a threadpool
Wrapping Up

  • The dask scheduler is application agnostic: it is the core low-level object of dask

  • The dask.array class builds on top of the dask scheduler and aims to be a drop-in replacement of the np.ndarray class


That's it! We have at our disposal an out-of-core version of numpy
A more complex task graph


In [1]: import dask.array as da
   ...: a = da.random.random((4000, 4000), chunks=(1000, 1000))
   ...: b = a @ a
   ...: c = b + b.T
					



yet, only 3 lines of standard numpy!

Going beyond numpy

The python scientific ecosystem is a wide collection of packages building on top of each other:

Going beyond numpy

Extensions for other well known packages : dask.dataframe, dask-ml, dask-lightgbm

In [1]: import dask.array as da
   ...: import dask.dataframe as dd
   ...: a = da.random.random((int(1e10), 5))

In [2]: df.head(n=2)
Out[2]:
          0         1         2         3         4         5
0  0.199389  0.099026  0.816139  0.885564  0.935145  0.156040
1  0.671736  0.142255  0.863120  0.855204  0.084645  0.565667

In [28]: %time df.head()
CPU times: user 84.5 ms, sys: 17.5 ms, total: 102 ms
Wall time: 101 ms

In [0]: from dask_ml.model_selection import HyperbandSearchCV

In [1]: search = HyperbandSearchCV(clf, params, max_iter=81, random_state=0)

In [2]: search.fit(X_train, y_train, classes=[0, 1]);

In [3]: search.best_params_
Out[3]: {'alpha': 0.7612985385451453, 'l1_ratio': 0.18850458399555325}

  • dask extensions such as dask.array, dask.dataframe, dask-ml aim to be drop in replacements of the libraries they extend

  • The memory layout of data structures, or their out of core aspects are implementation details, and should not change the user level API

Going beyond numpy

Dask can also create task graphs from arbitrary code logic using the delayed API

  
  
results = {}


for a in A:
    for b in B:
	if a < b:
	    results[a, b] = f(a, b)
	else:
	    results[a, b] = g(a, b)
print(results)

results = {}
from dask import delayed, compute

for a in A:
    for b in B:
	if a < b:
	    results[a, b] = delayed(f)(a, b)  # lazily construct graph
	else:
	    results[a, b] = delayed(g)(a, b)
print(compute(results))  # trigger graph computation

Towards an out of core scientific Python

  • dask.dataframe, dask-ml, dask-lightgbm...
  • dask.delayed(f)(a, b)

  • A significant part of the Python scientific stack has now out-of-core extensions/replacements. Day to day out of core work is now possible!
  • As application levels library developpers, we can take part in this effort by making our own projects out-of-core compatible

Solving the 21st century challenges using dask?

  • Out of core scientific computing: check
  • How does dask scale to multiple machines?

The local dask scheduler

The default dask scheduler is a lightweight scheduler designed to run on a single machine

The dask.distributed scheduler

The dask.distributed package exposes a more sophisticated scheduler designed to send tasks to workers living on different machines:

The dask scheduler and the dask workers are now separates processes that live on various machines of a compute cluster!

Advantages of the dask.distributed scheduler

  • Feature rich monitoring dashboard (!!)
  • Can run in asynchronous mode (!!)
  • Direct interaction with the scheduler for a fine-grain control (less important: in practice, replaces thedask.delayed API)

Integration with HPC and job-scheduling systems

  • Launching the dask workers process is now a cluster-specific action
  • Dask exposes a set of helper functions that will submit worker creation tasks to a specific cluster manager

from dask_jobqueue import SLURMCluster  # but also PBSCluster, MoabCluster...
# wrapper around a set of workers and scheduler, useful to tear everything
# down quickly
# lots of other arguments
cluster = SLURMCluster(processes=1, cores=1)

# the following command sends 8 sbatch requests that will each launch a dask Worker
# each dask worker will then connect to the scheduler process
# each dask worker will have only 1 process and 1 thread
cluster.scale(8)

from dask_jobqueue import SLURMCluster  # but also PBSCluster, MoabCluster...
# wrapper around a set of workers and scheduler, useful to tear everything
# down quickly
# lots of other arguments
cluster = SLURMCluster(processes=2, cores=1)

# the following command sends 4 sbatch requests that will each launch a dask Worker
# each dask worker will then connect to the scheduler process
# each dask worker will have only 1 process and 2 thread
cluster.scale(8)

from dask_jobqueue import SLURMCluster  # but also PBSCluster, MoabCluster...
# wrapper around a set of workers and scheduler, useful to tear everything
# down quickly
# lots of other arguments
cluster = SLURMCluster(processes=4, cores=2)

# the following command sends 2 sbatch requests that will each launch a dask Worker
# each dask worker will then connect to the scheduler process
# each dask worker will have only 2 process and 2 threads per process
cluster.scale(8)
  • Once launched and connected to the scheduler, the dask worker are immediately responsive, and will not terminate after one task only.
  • Arbitrary python tasks can be submitted to the scheduler using the Client interface

Running tasks on a cluster can thus be done entirely in an interactive Python session (setup instructions to come)!

Solving the 21st century computing challenges using dask?

  • Out of core scientific computing: check
  • Transparently distributing computations over a cluster of machines: check
  • Launching experiments natively from a Jupyter notebook instead of using the slurm command line interface: check

A few gotchas

  • Setting up jupyter notebooks on a cluster is a bit tedious. Here to help: Instructions, Me (among others!)
  • Since workers are persistent, it is easy to hold on to unused resources! Remember to scale down your cluster, or set the cluster to the adaptative mode!
  • You should not run CPU/Memory intensive code on the login node! Jupyter notebooks themselves should be launched from a slurm node