# Dask ### Chunks See [here](https://docs.dask.org/en/latest/array-chunks.html). Dask arrays are composed of many NumPy (or NumPy-like) arrays. How these arrays are arranged can significantly affect performance. For example, for a square array you might arrange your chunks along rows, along columns, or in a more square-like fashion. Different arrangements of NumPy arrays will be faster or slower for different algorithms. Note that from a terminology perspective, **blocks** and **chunks** refer to the same thing. ### Scheduling All of the large-scale Dask collections like [Dask Array](https://docs.dask.org/en/latest/array.html), [Dask DataFrame](https://docs.dask.org/en/latest/dataframe.html), and [Dask Bag](https://docs.dask.org/en/latest/bag.html) and the fine-grained APIs like [delayed](https://docs.dask.org/en/latest/delayed.html) and [futures](https://docs.dask.org/en/latest/futures.html) generate task graphs where each node in the graph is a normal Python function and edges between nodes are normal Python objects that are created by one task as outputs and used as inputs in another task. After Dask generates these task graphs, it needs to execute them on parallel hardware. This is the job of a _task scheduler_. Different task schedulers exist, and each will consume a task graph and compute the same result, but with different performance characteristics. ![](Screen%20Shot%202021-07-29%20at%207.16.44%20AM.png) ### Ideas * Groupby aggregate custom functions * What are partitions doing under the hood * memory reqs * cpu * threads * replicas * Diagrams of the above * numpy arrays? * pandas frames * running in parallel (delayed approaches I have used) * datatypes in underlying partitions * shuffle * Example use cases * parse surveys * bluestem groupby apply (custom func, need to set index, then persist) * without setting the index the groupby apply fails... ### Solution to bluestem issue * The parquet file had 3 partitions * I had 100 workers, each with 20Gb of ram * When trying to load this parquet file, a single partition seemed to be taking over 20Gb of ram, so 3 workers kept dying. All others at idle. * The solution was to move to a single large worker * Then, we want load in our 3 partitions * Then, we want to repartition (say, 50) and then write to parquet * Now we are in a good spot to move forward with our *old* topology * We can switch our topology back and reperform our our load (say, 100 works, 20 GB ram each?) * If we ever need to we can repartition to a higher number of partitions * ### Indexes * Note: Dask is not trying to keep track of a globally unique index * Pandas df will have an index, but each partition will it's own default range index ### Joins * if you make the thing you want to join right DF on the index * if you then sort the index on the right frame * it makes this join faster * No optimizations at first * If you are joining on index, make sure to make the index the same names * But again, don't do that at first ### Compute functions * Compute says: build task graph, send to scheduler, then wait for resp, pull result and load into memory (this is dask.dataframe.compute()) * Dask client.compute() acts like persist. persist() says hold reference (futures, ie async io) a ### Len functions * on every `record batch` of parquet there is a meta file essentially ### Memory vs parquet data size * Note: snappy is compression * We have seen 2GB compressed blowup to 800Gb * Number of rows and dtype is a better representation * When loaded into pandas, can blow up to 8TB * TODO: Give a brief overview of datatypes and memory sizes ### Overhead with many partitions * overhead will only start to matter with hundreds of partitions * And multiple groupbys in series * Overhead for dask kicks in around 200,000 to 1 million tasks * When you call .shape, we check that this df that I have has 3 partitions * Dask worker runs two processes (main worker loop) * And nanny, this is polling ram use, total objects * when you drop out of GIL, nanny processes trust goes down significantly * inspecting live processes is hard ### Group by apply splitout * In order to avoid pulling into memory as pandas frame, use splitout * Note: groupby.apply returns ddf * If result made a frame bigger than 20gb, would still crash worker * Dask opinion: if you do groupby, you are intending to get answer in memory * Auto split_out in fabric does this * Example kwarg to `groupby.apply(split_out=50, ) # groupby take_one would have split_out as input partitions (4 - 5 at most)` ### Load data with one massive worker, then repartition * Load in with one massive worker * Repartition to x partitions (where x is) ### Size to repartition to * What is the size of worker I intend to work with in my cluster? * Then we want to ensure that an individual partition in memory will need to be about 10% (if using pandas) of an individual partition in memory * Will get between 10-20Gb per worker * so shoot for 10 million - 100 million rows per partition * 15 partitions may be a good option * Every 1 million rows per column is 10Mb (10 bytes per column) ### Resource requests * We ask for a certain amount of resources for dask. That is was is meant by `cpu_request` and `memory_request` * Then, we ask for a certain number of dask workers. The memory and cpu are divided amongst these dask workers! * Ex: * we request 1000 GB and 100 cpu * We then ask for 50 workers * Each worker gets 20GB mem and 2 cpu * Each worker is a "machine" * A worker can get more processes (if we have good out of GIL code) ### NFS vs S3 * app work is between 10 and 100x slower than S3 * NFS is super slow wnd is bottle necked globally (painful serially read/write) * Slowest option that we have * Offered because it is easy ### Dask Scheduler Optimization It can be helpful to think of dask as consisting of three main parts: 1. An **architect** (the **client**, us, writing code) 2. **Foreman** (the **scheduler**, telling individual workers what to do) 3. **Workers** (the actual **worker nodes**, performing computation) ##### The Client On the **client side**, we write code. For instance below we create a dask array: ![](Screen%20Shot%202021-07-29%20at%206.54.34%20AM.png) When we write that code dask will (for us, under the hood) generate a **high level graph**. Every node in that graph roughly corresponds to roughly one operation in your code (every `+` , transpose, sum, etc will be a node). Then, dask (again, under the hood) will translate that high level graph into a **low level graph**, where every node in that graph corresponds to one task for one worker. So, as our dataset size increase the size of our low level graph will increase. So if we have a petabyte array this graph will end up being huge, which ends up being a real pain. Finally we **optimize** the low level graph. For instance we see above in the low level graph that there are many chains that can all be done at once, so we combine those into a single node. So, on the client side we can envision this as the **architect**. We build up the plans, ship them out, and then send them off to the scheduler. ![](Screen%20Shot%202021-07-29%20at%207.01.19%20AM.png) ##### Scheduler The scheduler takes these plans and pushes them into it's internal state machine (which is a complex mess of data structures), and then the internal state machine talks to all of the workers and then on the fly talks to all the works and tells them what they need to do all the time. So, there are costs up until now at every point in this pipeline: * **Building high level graph**: pretty cheap, not that complex * **Convert that to low level graph**: can be expensive * **Optimize low level graph**: can be expensive * **Serialize, push over network to scheduler**: can also be expensive based on network speed * **Scheduler converts graph into internal state machine representation**: can also be expensive * **Conversation between scheduler and workers**: As we add more workers this can be more expensive #### Optimize the Client ![](Screen%20Shot%202021-07-29%20at%207.22.10%20AM.png) ![](Screen%20Shot%202021-07-29%20at%207.22.39%20AM.png) If we ever call `compute` or `persist` and it feels as if dask is lagging a bit in the background, this is what is happening! Dask is: * Making graphs * optimizing graphs * Serializing graphs * Shipping them to scheduler This delay is fine when you have a task graph of 40,000 tasks. However, when you start hitting larger workloads then it can start taking minutes. This is half of what we need to think about in terms of optimizing the scheduler. What is a good solution to this? We could just ship the high level graph! ![](Screen%20Shot%202021-07-29%20at%207.26.08%20AM.png) But what happened to the low level expansion? Optimization? Serialization? They cannot be skipped! But there are (very complicated) ways of baking them into the dask scheduler itself. #### Optimize Scheduler & Workers See [this point](https://youtu.be/vZ3R1DxTwbA?t=877) in the video