Skip to content

Conversation

@milenkovicm
Copy link
Contributor

@milenkovicm milenkovicm commented Oct 30, 2025

Which issue does this PR close?

Closes #1142

Rationale for this change

For quite some time, we wanted to provide a Ballista Python interface and make it an extension of DataFusion Python. For the reasons mentioned in #1142, we haven't been able to do so. The main issue was that we could not use DataFrame as there was a class mismatch, something like

from pyballista import BallistaBuilder
from datafusion import SessionContext
from datafusion import functions as f

# %%
ctx: SessionContext = BallistaBuilder()\
    .config("ballista.job.name", "example ballista")\
    .config("ballista.shuffle.partitions", "16")\
    .standalone()
    
df = ctx.sql("SELECT 1 as r").aggregate(
    [f.col("r")], [f.count_star()]
)
df.show()

was not possible due to FFI boundary between datafusion and ballista python.

We also wanted to reuse and rely on datafusion python as much as possible, as we do not have bandwidth to maintain duplicated effort.

What changes are included in this PR?

This PR relies on python duck typing, to "fake" DataFrame interface, with DistributedDataFrame extension which would execute query on ballista cluster.

from ballista import BallistaSessionContext
from datafusion import col, lit, DataFrame
from datafusion import functions as f

# we replace 
# ctx = SessionContext()
# with
ctx = BallistaSessionContext(address="df://127.0.0.1:50050")

df : DataFrame = ctx.table("t")
df.filter(col("id") > lit(4)).show()

df0 = ctx.sql("SELECT 1 as r")

df0.aggregate(
    [f.col("r")], [f.count_star()]
)

df0.show()

There is slight inneficiency where original logical plan will be serialised in datafusion python and deserialised in ballista python in order to cross FFI boundary as well as re-creation of BallistaContext

Also, we would need to override few methods to make it work with ballista.

Things left to do / address

  • write method support
  • printing in jupiter
  • collect_partitioned
  • standalone support
  • propagate SessionConfig across
  • keep session_id between calls
  • create test cluster
  • add test
  • deprecate/remove old interface
  • ...

Note of caution: there are too many sharp edges, but so far it looks as most promising approach

Are there any user-facing changes?

There will be change in interface, but too early to tell

@milenkovicm
Copy link
Contributor Author

@timsaucer you may be able to advise, new approach to add ballista support to datafusion python without changing datafusion python code directly.

It is in state of early POC but I wanted to share it early, as rust python is not my strongest strength 😀

@milenkovicm milenkovicm changed the title initial prototype of new ballista python interface feat: initial prototype of new ballista python interface Oct 30, 2025
@milenkovicm milenkovicm force-pushed the feat_python_new_interface branch from 3473489 to 3750034 Compare November 1, 2025 09:44
Comment on lines 59 to 70
class DistributedDataFrame(DataFrame, metaclass=RedefiningMeta):
def __init__(self, df: DataFrame, session_id: str, address: str):
super().__init__(df.df)
self.address = address
self.session_id = session_id

def _to_internal_df(self):
blob_plan = self.logical_plan().to_proto()
df = PyBallistaRemoteExecutor.create_data_frame(
blob_plan, self.address, self.session_id
)
return df
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main idea, right? That you expose to the user a python class that uses the datafusion class as a Base and then extend on top of it. Did I get that right?

I think this is good approach. The one concern I have is users who accidentally use a mixture of DataFusion objects and Ballista extensions. The way I could imaging this happening was that someone has some portion of their code base that they've copied over from their single node work.

The other approach, and the one that I think in the long term better would be to make it possible for Ballista to just replace the execution plan via a physical plan optimizer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for replacing the physical planning/optimizer path to use Ballista instead of DataFusion

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

physical planner would be the best option, not sure if with @timsaucer FFI effort we're closer to get there

@milenkovicm
Copy link
Contributor Author

Long term plan, and the best plan, would be if we could just plug-in ballista's physical planner to datafusion-python.
Ideally, we do not want to maintain many python classes in ballista, we should really rely on pydf .

The main idea of this approach is to extend DataFrame and SessionContext to intercept methods which create DataFrame and methods that actually execute the plan, such as show, collect, write ...
When 'execute' methods are invoked, we would create a BallistaSessionContext and create a BallistaDataFrame (which internally has a Ballista physical planner) and execute those methods on ballista context.

Regarding your concern @timsaucer, I'm not sure that I fully understand it. Current goal would be for ballista just to expose BallistaSessionContext and nothing else, hopefully all other classes could be resused from "single node" work. So full portability of the code is target (well if we can get udf serialised).

Current risks I see:

  • we have two session context (pydf and ballista session context recreated on 'plan execution')
  • we miss some of DataFrame creation methods and plan executes on single node context
  • to many duplicated code from pydf

@milenkovicm milenkovicm marked this pull request as ready for review November 7, 2025 16:31
@milenkovicm milenkovicm changed the title feat: initial prototype of new ballista python interface poc: New ballista python interface Jan 5, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Ballista Python Issue(s)

3 participants