-
Notifications
You must be signed in to change notification settings - Fork 34
Description
There are several approaches on the table for choosing an appropriate amount of tasks for the different stages of a query:
- Cardinality effect: this is the current approach, if a stage contains a node that reduces cardinality, a scale factor is applied to the task count of the stage above.
- Cost based estimation: approach attempted at Cost based planning #311. Attributes a compute cost to the different nodes, and based on statistics propagation, it knows how much compute power will be need for the different stages.
The big limitation with those approaches is that they make two big assumptions:
- Size of the data pulled by leaf nodes can be inferred at planning time.
- Size of the data flowing through stages can be calculated based on stats propagation.
Unfortunately, 1) is true only for specific cases, like reading from parquet files, and 2) is mostly not currently implemented upstream, and if it was, it will only be estimations.
Some engines overcome this by implementing adaptative query execution, which means that, based on the results of intermediate stages, they are capable of sizing appropriately the next stage, or even adding/removing workers involved in the query on the fly. Some systems like Spark can even change the shape of the plan on the fly.
This is difficult to do in this project, as no materialization occurs between stages, and therefore, there's really nothing that can be looked at for sizing them. Even if it was, once a number of tasks have been stablished for a stage, that's set in stone, it cannot be resized. However, there do is something we can do: choose the task count as late as possible, lazily as data comes in.
Here's an example plan:
┌──────────────────┐
│CoalescePartitions│
└──────────────────┘
┌──────────────────┐
│ Aggregate(final) │
└──────────────────┘
┌──────────────────┐
│ Repartition │
└──────────────────┘
┌──────────────────┐
│Aggregate(partial)│
└──────────────────┘
┌──────────────────┐
│ Filter │
└──────────────────┘
┌──────────────────┐
│ DataSource │
└──────────────────┘
The same plan with network boundaries would look like this:
┏━━━━━━━━━━━━━━━━━━┓
┃ DistributedExec ┃
┗━━━━━━━━━━━━━━━━━━┛
┌──────────────────┐
│CoalescePartitions│
└──────────────────┘
┏━━━━━━━━━━━━━━━━━━┓
┃ NetworkCoalesce ┃
┗━━━━━━━━━━━━━━━━━━┛
┌──────────────────┐
│ Aggregate(final) │
└──────────────────┘
┏━━━━━━━━━━━━━━━━━━┓
┃ NetworkShuffle ┃
┗━━━━━━━━━━━━━━━━━━┛
┌──────────────────┐
│ Repartition │
└──────────────────┘
┌──────────────────┐
│Aggregate(partial)│
└──────────────────┘
┌──────────────────┐
│ Filter │
└──────────────────┘
┌──────────────────┐
│ DataSource │
└──────────────────┘
The current state of the project will assign an input task count to the different network boundaries at planning time, but let's imagine we don't, and the task count is left as an incognita that will be decided later.
The first step would be to choose an appropriate amount of tasks for the leaf node, which needs to be done before any data flows in. #374 explains how this can be done at runtime. Let's imagine that 3 tasks are chosen:
┌──────────────────┐┌──────────────────┐┌──────────────────┐
│Aggregate(partial)││Aggregate(partial)││Aggregate(partial)│
└──────────────────┘└──────────────────┘└──────────────────┘
┌──────────────────┐┌──────────────────┐┌──────────────────┐
│ Filter ││ Filter ││ Filter │
└──────────────────┘└──────────────────┘└──────────────────┘
┌──────────────────┐┌──────────────────┐┌──────────────────┐
│ DataSource ││ DataSource ││ DataSource │
└──────────────────┘└──────────────────┘└──────────────────┘
We have not drawn the RepartitionExec because we still don't know it's output partitions, as those will need to be set based on the amount of tasks of the stage above, and it's something we still don't know.
Now, we need a way to inform the DistributedExec node about how many tasks are appropriate to be spawned for the stage above, and for that, we can look at a sample of the data that flowed in the first 100 ms or something similar (pretty arbitrary, just for explanation purposes):
┌──────────────────┐┌──────────────────┐┌──────────────────┐
│ Sampler ││ Sampler ││ Sampler │
└──────────────────┘└──────────────────┘└──────────────────┘
┌──────────────────┐┌──────────────────┐┌──────────────────┐
│Aggregate(partial)││Aggregate(partial)││Aggregate(partial)│
└──────────────────┘└──────────────────┘└──────────────────┘
┌──────────────────┐┌──────────────────┐┌──────────────────┐
│ Filter ││ Filter ││ Filter │
└──────────────────┘└──────────────────┘└──────────────────┘
┌──────────────────┐┌──────────────────┐┌──────────────────┐
│ DataSource ││ DataSource ││ DataSource │
└──────────────────┘└──────────────────┘└──────────────────┘
The sampler communicates with the DistributedExec an estimation of the amount of data that will flow through it, so that DistributedExec can lazily size the next stage. This will have a small runtime cost.
Let's imagine this situation:
- the
Filternode was very selective - the
Aggregate(partial)was aggregating over a column with very low NDV
This will translate in very little amount of data reaching the Sampler, which might even only receive a single RecordBatch after batching up to datafusion.execution.batch_size. At this point, the Sampler can communicate to the DistributedExec this information, and the DistributedExec might decide to just not distribute further, and run the whole rest of the plan in single node, avoiding unnecessary network hops:
┏━━━━━━━━━━━━━━━━━━┓
┃ DistributedExec ┃
┗━━━━━━━━━━━━━━━━━━┛
┌──────────────────┐
│CoalescePartitions│
└──────────────────┘
┌──────────────────┐
│ Aggregate(final) │
└──────────────────┘
┏━━━━━━━━━━━━━━━━━━┓
┃ NetworkShuffle ┃
┗━━━━━━━━━━━━━━━━━━┛
┌──────────────────┐┌──────────────────┐┌──────────────────┐
│ Repartition ││ Repartition ││ Repartition │
└──────────────────┘└──────────────────┘└──────────────────┘
┌──────────────────┐┌──────────────────┐┌──────────────────┐
│ Sampler ││ Sampler ││ Sampler │
└──────────────────┘└──────────────────┘└──────────────────┘
┌──────────────────┐┌──────────────────┐┌──────────────────┐
│Aggregate(partial)││Aggregate(partial)││Aggregate(partial)│
└──────────────────┘└──────────────────┘└──────────────────┘
┌──────────────────┐┌──────────────────┐┌──────────────────┐
│ Filter ││ Filter ││ Filter │
└──────────────────┘└──────────────────┘└──────────────────┘
┌──────────────────┐┌──────────────────┐┌──────────────────┐
│ DataSource ││ DataSource ││ DataSource │
└──────────────────┘└──────────────────┘└──────────────────┘
Of course, the logic in the Sampler can get arbitrarily complex:
- buffer for a fixed number of time and see how many bytes flowed in
- buffer N
RecordBatches, and see how much time passed between their arrival - buffer until stream finish, paying a high memory and latency cost