Skip to content

Comments

[WIP][SPARK-54986][PYTHON][ML][CONNECT] Disable reattachable in _delete_ml_cache#53749

Closed
zhengruifeng wants to merge 3 commits intoapache:masterfrom
zhengruifeng:simplify_ml_del
Closed

[WIP][SPARK-54986][PYTHON][ML][CONNECT] Disable reattachable in _delete_ml_cache#53749
zhengruifeng wants to merge 3 commits intoapache:masterfrom
zhengruifeng:simplify_ml_del

Conversation

@zhengruifeng
Copy link
Contributor

@zhengruifeng zhengruifeng commented Jan 9, 2026

What changes were proposed in this pull request?

Do not retry _delete_ml_cache and set a timeout

Why are the changes needed?

the test_parity_clustering still hangs, even though the probabality is much lower than before

The all threads dumps https://github.com/gaogaotiantian/spark/actions/runs/20841838810/job/59877763802

shows it get stuck at _delete_ml_cache

looks like there is still a dead lock in grpc on python side

Traceback for thread 24397 (python3.11) [] (most recent call last):
    (Python) File "/usr/lib/python3.11/threading.py", line 1002, in _bootstrap
        self._bootstrap_inner()
    (Python) File "/usr/lib/python3.11/threading.py", line 1045, in _bootstrap_inner
        self.run()
    (Python) File "/usr/lib/python3.11/threading.py", line 982, in run
        self._target(*self._args, **self._kwargs)
    (Python) File "/usr/lib/python3.11/concurrent/futures/thread.py", line 83, in _worker
        work_item.run()
    (Python) File "/usr/lib/python3.11/concurrent/futures/thread.py", line 58, in run
        result = self.fn(*self.args, **self.kwargs)
    (Python) File "/__w/spark/spark/python/pyspark/sql/connect/client/reattach.py", line 238, in target
        self._stub.ReleaseExecute(request, metadata=self._metadata)
    (Python) File "/usr/local/lib/python3.11/dist-packages/grpc/_channel.py", line 1163, in __call__
        state, call = self._blocking(
    (Python) File "/usr/local/lib/python3.11/dist-packages/grpc/_channel.py", line 1134, in _blocking
        call = self._channel.segregated_call(
    (Python) File "/usr/lib/python3.11/threading.py", line 272, in __enter__
        return self._lock.__enter__()


Traceback for thread 24268 (python3.11) [] (most recent call last):
    (Python) File "<frozen runpy>", line 198, in _run_module_as_main
    (Python) File "<frozen runpy>", line 88, in _run_code
    ...
    (Python) File "/__w/spark/spark/python/pyspark/ml/tests/test_clustering.py", line 466, in test_distributed_lda
        self.assertEqual(str(model), str(model2))
    (Python) File "/__w/spark/spark/python/pyspark/ml/wrapper.py", line 474, in __repr__
        return self._call_java("toString")
    (Python) File "/__w/spark/spark/python/pyspark/ml/util.py", line 322, in wrapped
        return remote_call()
    (Python) File "/__w/spark/spark/python/pyspark/ml/util.py", line 308, in remote_call
        (_, properties, _) = session.client.execute_command(command)
    (Python) File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 1162, in execute_command
        data, _, metrics, observed_metrics, properties = self._execute_and_fetch(
    (Python) File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 1664, in _execute_and_fetch
        for response in self._execute_and_fetch_as_iterator(
    (Python) File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 1621, in _execute_and_fetch_as_iterator
        generator = ExecutePlanResponseReattachableIterator(
    (Python) File "/__w/spark/spark/python/pyspark/sql/connect/client/reattach.py", line 127, in __init__
        self._stub.ExecutePlan(self._initial_request, metadata=metadata)
    (Python) File "/usr/local/lib/python3.11/dist-packages/grpc/_channel.py", line 1396, in __call__
        call = self._managed_call(
    (Python) File "/usr/local/lib/python3.11/dist-packages/grpc/_channel.py", line 1785, in create
        call = state.channel.integrated_call(
    (Python) File "/__w/spark/spark/python/pyspark/ml/util.py", line 379, in wrapped
        self._remote_model_obj.release_ref()
    (Python) File "/__w/spark/spark/python/pyspark/ml/util.py", line 162, in release_ref
        del_remote_cache(self.ref_id)
    (Python) File "/__w/spark/spark/python/pyspark/ml/util.py", line 358, in del_remote_cache
        session.client._delete_ml_cache([ref_id])
    (Python) File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 2137, in _delete_ml_cache
        (_, properties, _) = self.execute_command(command)
    (Python) File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 1162, in execute_command
        data, _, metrics, observed_metrics, properties = self._execute_and_fetch(
    (Python) File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 1664, in _execute_and_fetch
        for response in self._execute_and_fetch_as_iterator(
    (Python) File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 1621, in _execute_and_fetch_as_iterator
        generator = ExecutePlanResponseReattachableIterator(
    (Python) File "/__w/spark/spark/python/pyspark/sql/connect/client/reattach.py", line 127, in __init__
        self._stub.ExecutePlan(self._initial_request, metadata=metadata)
    (Python) File "/usr/local/lib/python3.11/dist-packages/grpc/_channel.py", line 1396, in __call__
        call = self._managed_call(
    (Python) File "/usr/local/lib/python3.11/dist-packages/grpc/_channel.py", line 1784, in create
        with state.lock:

The ReleaseExecute in thread 24397 is for retry/reattach

"""
Retryable iterator of ExecutePlanResponses to an ExecutePlan call.
It can handle situations when:
- the ExecutePlanResponse stream was broken by retryable network error (governed by
retryPolicy)
- the ExecutePlanResponse was gracefully ended by the server without a ResultComplete
message; this tells the client that there is more, and it should reattach to continue.
Initial iterator is the result of an ExecutePlan on the request, but it can be reattached with
ReattachExecute request. ReattachExecute request is provided the responseId of last returned
ExecutePlanResponse on the iterator to return a new iterator from server that continues after
that. If the initial ExecutePlan did not even reach the server, and hence reattach fails with
INVALID_HANDLE.OPERATION_NOT_FOUND, we attempt to retry ExecutePlan.
In reattachable execute the server does buffer some responses in case the client needs to
backtrack. To let server release this buffer sooner, this iterator asynchronously sends
ReleaseExecute RPCs that instruct the server to release responses that it already processed.
"""

It is not clear how it affect the _delete_ml_cache, but command _delete_ml_cache itself doesn't need retry, so I rewrite it without retry and set a timeout=3 sec.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Will monitor the CI

Was this patch authored or co-authored using generative AI tooling?

No

fix
@github-actions
Copy link

github-actions bot commented Jan 9, 2026

JIRA Issue Information

=== Improvement SPARK-54986 ===
Summary: Do not retry _delete_ml_cache
Assignee: None
Status: Open
Affected: ["4.2.0"]


This comment was automatically generated by GitHub Actions

if len(cache_ids) > 0:
def _delete_ml_cache(self, cache_ids: List[str], evict_only: bool = False) -> None:
if len(cache_ids) > 0:
try:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it doesn't matter too much even if this command fails

[pb2.ObjectRef(id=cache_id) for cache_id in cache_ids]
)
command.ml_command.delete.evict_only = evict_only
(_, properties, _) = self.execute_command(command)
Copy link
Contributor Author

@zhengruifeng zhengruifeng Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

execute_command is baed on reattachable execution

@zhengruifeng zhengruifeng marked this pull request as draft January 9, 2026 13:36
@zhengruifeng zhengruifeng changed the title [SPARK-54986][PYTHON][ML][CONNECT] Do not retry _delete_ml_cache [WIP][SPARK-54986][PYTHON][ML][CONNECT] Do not retry _delete_ml_cache Jan 9, 2026
@zhengruifeng zhengruifeng changed the title [WIP][SPARK-54986][PYTHON][ML][CONNECT] Do not retry _delete_ml_cache [WIP][SPARK-54986][PYTHON][ML][CONNECT] Disable reattachable in _delete_ml_cache Jan 12, 2026
@zhengruifeng zhengruifeng deleted the simplify_ml_del branch January 28, 2026 01:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant