Skip to content

Leverage PartialReduce AggregationExec mode to drastically reduce shuffle size #360

@fmonjalet

Description

@fmonjalet

Summary

When DataFusion 53 is released, we should use the PartialReduce mode of AggregateExec (apache/datafusion#20019, kudos to @njsmith) for maximum local data reduction before shuffling.

The expectation is reducing the amount of data exchanged by up to a factor of n_partition_per_task (for example 16x).

Details

Very schematically, here is how a shuffle happens:

Image

My understanding is that we will send duplicate aggregate keys on the network, where we could actually have reduced them to one line locally before the shuffle.

We should leverage PartialReduce to further reduce data before shuffling:

Image

In this example, we'd reduce data exchanged by 4x (because there are 4 partitions per task).

No strong opinion on whether we should add another RepartitionExec layer like in this graph, or if we should run the AggregateExec(PartialReduce) on the "ready to shuffle" partitions. Since shuffles can have hundreds of partitions, I don't know what is the comparative cost of:

  • RepartitionExec(Hash, output_partitions=16) + AggregateExec(PartialReduce) on 16 partitions
  • vs AggregateExec(PartialReduce) on potentially hundreds of partitions

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions