Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 21 additions & 15 deletions python/pyspark/sql/connect/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1396,14 +1396,21 @@ def _analyze(self, method: str, **kwargs: Any) -> AnalyzeResult:
except Exception as error:
self._handle_error(error)

def _execute(self, req: pb2.ExecutePlanRequest) -> None:
def _execute(
self,
req: pb2.ExecutePlanRequest,
reattachable: Optional[bool] = None,
) -> None:
"""
Execute the passed request `req` and drop all results.

Parameters
----------
req : pb2.ExecutePlanRequest
Proto representation of the plan.
reattachable : bool, option
Whether to enable reattachable execution for this specific request.
If None, fallback to 'self._use_reattachable_execute'.

"""
logger.debug("Execute")
Expand All @@ -1414,8 +1421,11 @@ def _execute(self, req: pb2.ExecutePlanRequest) -> None:
def handle_response(b: pb2.ExecutePlanResponse) -> None:
self._verify_response_integrity(b)

if reattachable is None:
reattachable = self._use_reattachable_execute

try:
if self._use_reattachable_execute:
if reattachable:
# Don't use retryHandler - own retry handling is inside.
generator = ExecutePlanResponseReattachableIterator(
req, self._stub, self._retrying, self._builder.metadata()
Expand Down Expand Up @@ -2125,26 +2135,22 @@ def _create_profile(self, profile: pb2.ResourceProfile) -> int:
profile_id = properties["create_resource_profile_command_result"]
return profile_id

def _delete_ml_cache(self, cache_ids: List[str], evict_only: bool = False) -> List[str]:
# try best to delete the cache
try:
if len(cache_ids) > 0:
def _delete_ml_cache(self, cache_ids: List[str], evict_only: bool = False) -> None:
if len(cache_ids) > 0:
try:
command = pb2.Command()
command.ml_command.delete.obj_refs.extend(
[pb2.ObjectRef(id=cache_id) for cache_id in cache_ids]
)
command.ml_command.delete.evict_only = evict_only
(_, properties, _) = self.execute_command(command)

assert properties is not None
# construct the request
req = self._execute_plan_request_with_metadata()
req.plan.command.CopyFrom(command)

if properties is not None and "ml_command_result" in properties:
ml_command_result = properties["ml_command_result"]
deleted = ml_command_result.operator_info.obj_ref.id.split(",")
return cast(List[str], deleted)
return []
except Exception:
return []
self._execute(req, reattachable=False)
except Exception:
pass

def _cleanup_ml_cache(self) -> None:
try:
Expand Down