-
Notifications
You must be signed in to change notification settings - Fork 34
Description
There is a common pattern in some large-scale APIs that this project currently does not account for: a two step API contract where:
- The first step discovers metadata related to API calls or files that should be retrieved from somewhere else.
- The second step issues queries or reads files based on the metadata collected by the first API.
Currently, the first step can happen in two places:
- At planning time, blocking the start of the second step until the first step has fully completed
- At runtime, which allows kickstarting the second step as the first discovery step makes progress in a "streaming fashion"
Let's assume 1) is not possible due to performance limitations. 2) is still a valid option in single-node DataFusion, however, it implies an additional challenge in Distributed DataFusion:
- There is no shared memory between leaf stage tasks, and therefore, the metadata stream from step one needs to be executed in it's own "silo" (a distributed task), missing out on the opportunity to distribute metadata evenly across leaf stage tasks.
- The task count estimation most likely needs to take into account this metadata to provide a relevant number of tasks. In single-node DataFusion the answer to this is very straight forward: "let's just use all the CPUs in the machine, it's free sending data across threads". However, in Distributed DataFusion there do is a high price to pay in distributing unnecessarily.
Trino handles this pretty well with https://trino.io/docs/current/develop/connectors.html#connectorsplitmanager. There, the coordinator is capable of dynamically generating splits, that are then picked up by workers and executed as they become available.
Example of how this works in Trino Iceberg connector.
How this would look like in Distributed DataFusion? my guess is that it would be something like this:
┌─────────────────────────────────────────┐
│ DistributedExec │
│ │
│ │
│ ┌──────────────────────────────────────┐│
│ │ DataSourceExec Metadata Discovery ││
│ └──────────────────────────────────────┘│
│ ┌────────┐┌────────┐┌───┐┌───┐┌───┐ │
│ │Metadata││Metadata││ ││ ││ │ ... │
│ └───┬────┘└────┬───┘└─┬─┘└─┬─┘└─┬─┘ │
└─────┼──────────┼──────┼────┼────┼───────┘
│ │ │ │ │
┌─────────┘ │ └────┼────┼──────────────┐
│ ┌──────────────────┼───────────┘ │ │
│ │ │ ┌──────────────┘ │
│ │ │ │ │
▼ ▼ ▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Task 1 │ │ Task 2 │ │ Task N │
│ │ │ │ │ │
│ │ │ │ │ │
│ │ │ │ │ │
│ │ │ │ │ │
│ │ │ │ │ │
│ ┌──────────────┐│ │┌──────────────┐ │ │ ┌──────────────┐│
│ │DataSourceExec││ ││DataSourceExec│ │ │ │DataSourceExec││
│ └──────────────┘│ │└──────────────┘ │ │ └──────────────┘│
└─────────────────┘ └─────────────────┘ └─────────────────┘
I'm choosing the "metadata" name a bit blindly, there's probably a better name. This metadata should probably just be arbitrary bytes suitable to be sent over the wire to worker nodes, and it would be up to the user to decide what those bytes represent.
This also means that the relationship between DistributedExec and worker nodes is a bit more complex: it needs to send the stream of metadata along with the plan.
┌─────────────────┐
│ DistributedExec │
└─────────────────┘
│
plan: Vec<u8>
stage_key: StageKey
metadata_stream: impl Stream<Bytes>
│
▼
┌─────────────────┐
│ Worker │
└─────────────────┘
Unfortunately, the Arrow Flight specification was not though for that kind of pattern, which is fair, and it's probably the signal we need to know we've gone too far by squeezing our own protocol into Arrow Flight, so this will need to happen first:
This also implies some extension of the current TaskEstimator API, which will likely need to do more things rather than just choosing a number of tasks, it will also need to return the metadata streams. There are more reasons why we would want to evolve the TaskEstimator API into something else, like #378, so whatever comes out of that need to be suitable for holding both things.