Pierre Glaser, Gatsby Computational Neuroscience Unit
In [1]: import pandas as pd
...: df = pd.read_csv('neuron_activities.csv')
...: firing_rates = df['firing_rate'].dropna()
...: print(firing_rates.mean())
In [1]: import pandas as pd
...: df = pd.read_csv('neuron_activities.csv')
...: firing_rates = df['firing_rate'].dropna()
...: print(firing_rates.mean())
In [1]: import pandas as pd
...: df = pd.read_csv('neuron_activities.csv')
...: firing_rates = df['firing_rate'].dropna()
...: print(firing_rates.mean())
In [1]: import dask.array as da
...: a = da.random.random((4000, 1000)) # lazy!
In [1]: import dask.array as da
...: a = da.random.random((4000, 1000)) # lazy!
...: m = a.mean() # lazy!
In [1]: import dask.array as da
...: a = da.random.random((4000, 1000)) # lazy!
...: m = a.mean() # lazy!
...: print(m.compute()) # make dask execute the task graph
Each node in this graph represents a unitary python function call with a RAM-friendly memory footprint
In [1]: import dask.array as da
...: a = da.random.random((4000, 1000)) # lazy!
...: m = a.mean() # lazy!
In [1]: import dask.array as da
...: a = da.random.random((4000, 1000)) # lazy!
...: m = a.mean() # lazy!
...: print(m.compute()) # make dask execute the task graph
In [1]: import dask.array as da
...: a = da.random.random((4000, 4000), chunks=(1000, 1000))
...: b = a @ a
...: c = b + b.T
In [1]: import dask.array as da
...: import dask.dataframe as dd
...: a = da.random.random((int(1e10), 5))
In [2]: df.head(n=2)
Out[2]:
0 1 2 3 4 5
0 0.199389 0.099026 0.816139 0.885564 0.935145 0.156040
1 0.671736 0.142255 0.863120 0.855204 0.084645 0.565667
In [28]: %time df.head()
CPU times: user 84.5 ms, sys: 17.5 ms, total: 102 ms
Wall time: 101 ms
In [0]: from dask_ml.model_selection import HyperbandSearchCV
In [1]: search = HyperbandSearchCV(clf, params, max_iter=81, random_state=0)
In [2]: search.fit(X_train, y_train, classes=[0, 1]);
In [3]: search.best_params_
Out[3]: {'alpha': 0.7612985385451453, 'l1_ratio': 0.18850458399555325}
results = {}
for a in A:
for b in B:
if a < b:
results[a, b] = f(a, b)
else:
results[a, b] = g(a, b)
print(results)
results = {}
from dask import delayed, compute
for a in A:
for b in B:
if a < b:
results[a, b] = delayed(f)(a, b) # lazily construct graph
else:
results[a, b] = delayed(g)(a, b)
print(compute(results)) # trigger graph computation
The dask.distributed package exposes a more sophisticated scheduler designed to send tasks to workers living on different machines:
The dask scheduler and the dask workers are now separates processes that live on various machines of a compute cluster!
from dask_jobqueue import SLURMCluster # but also PBSCluster, MoabCluster...
# wrapper around a set of workers and scheduler, useful to tear everything
# down quickly
# lots of other arguments
cluster = SLURMCluster(processes=1, cores=1)
# the following command sends 8 sbatch requests that will each launch a dask Worker
# each dask worker will then connect to the scheduler process
# each dask worker will have only 1 process and 1 thread
cluster.scale(8)
from dask_jobqueue import SLURMCluster # but also PBSCluster, MoabCluster...
# wrapper around a set of workers and scheduler, useful to tear everything
# down quickly
# lots of other arguments
cluster = SLURMCluster(processes=2, cores=1)
# the following command sends 4 sbatch requests that will each launch a dask Worker
# each dask worker will then connect to the scheduler process
# each dask worker will have only 1 process and 2 thread
cluster.scale(8)
from dask_jobqueue import SLURMCluster # but also PBSCluster, MoabCluster...
# wrapper around a set of workers and scheduler, useful to tear everything
# down quickly
# lots of other arguments
cluster = SLURMCluster(processes=4, cores=2)
# the following command sends 2 sbatch requests that will each launch a dask Worker
# each dask worker will then connect to the scheduler process
# each dask worker will have only 2 process and 2 threads per process
cluster.scale(8)