Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 55 additions & 11 deletions nicegui/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from contextlib import suppress
from functools import partial
from pickle import PicklingError
from typing import Any, TypeVar
from typing import Any, Literal, TypeVar, overload

from typing_extensions import ParamSpec

Expand All @@ -20,6 +20,15 @@
R = TypeVar('R')


class CancelledError(asyncio.CancelledError):
"""Raised by ``cpu_bound`` / ``io_bound`` when *strict=True* and the
background work is cancelled or the app is shutting down.

A subclass of :class:`asyncio.CancelledError` so that existing
``except CancelledError`` handlers continue to work.
"""
Comment on lines +23 to +29
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a strong reason to prefer a subclass vs just re-raising the existing exception... I suppose the one weird issue would be creating one in the first if strict pre-check.

But I'm generally hesitant to make new exceptions, and especially so when they have the same name as an underlying one. 🤔

Copy link
Copy Markdown
Author

@calebgregory calebgregory Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I appreciate the hesitancy to make new exceptions. I think that's why I had the impulse to name this one nicegui.run.CancelledError to match asyncio's name. But maybe it's better to give it a distinct name anyway, to prevent confusion.

I agree that raising a new asyncio.CancelledError() when the app is shutting down would be strange.

Defining the exception in nicegui.run offers some code-as-documentation + some consumer-convenience:

import asyncio
from nicegui import run

try:
    run.cpu_bound(..., strict=True)
except asyncio.CancelledError:
    ...

vs.

from nicegui import run

try:
    run.cpu_bound(..., strict=True)
except run.CancelledError:
    ...



def setup() -> None:
"""Setup the process pool. (For internal use only.)"""
global process_pool # pylint: disable=global-statement # noqa: PLW0603
Expand Down Expand Up @@ -57,35 +66,55 @@ def safe_callback(callback: Callable, *args, **kwargs) -> Any:
raise SubprocessException(type(e).__name__, str(e), traceback.format_exc()) from None


async def _run(executor: Any, callback: Callable[P, R], *args: P.args, **kwargs: P.kwargs) -> R:
async def _run(executor: Any, callback: Callable[P, R], *args: P.args,
strict: bool = False, **kwargs: P.kwargs) -> R | None:
if core.app.is_stopping:
return # type: ignore # the assumption is that the user's code no longer cares about this value
if strict:
raise CancelledError('app is stopping')
return None
try:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(executor, partial(callback, *args, **kwargs))
except RuntimeError as e:
if 'cannot schedule new futures after shutdown' not in str(e):
raise
except asyncio.CancelledError:
pass
return # type: ignore # the assumption is that the user's code no longer cares about this value
if strict:
raise CancelledError('cannot schedule new futures after shutdown') from e
except asyncio.CancelledError as e:
if strict:
raise CancelledError('task was cancelled') from e
return None


@overload
async def cpu_bound(callback: Callable[P, R], *args: P.args, strict: Literal[True],
**kwargs: P.kwargs) -> R: ...


@overload
async def cpu_bound(callback: Callable[P, R], *args: P.args, strict: Literal[False] = ...,
**kwargs: P.kwargs) -> R | None: ...


async def cpu_bound(callback: Callable[P, R], *args: P.args, **kwargs: P.kwargs) -> R:
async def cpu_bound(callback: Callable[P, R], *args: P.args, strict: bool = False,
**kwargs: P.kwargs) -> R | None:
"""Run a CPU-bound function in a separate process.

`run.cpu_bound` needs to execute the function in a separate process.
For this it needs to transfer the whole state of the passed function to the process (which is done with pickle).
It is encouraged to create static methods (or free functions) which get all the data as simple parameters (eg. no class/ui logic)
and return the result (instead of writing it in class properties or global variables).

If *strict* is ``True``, a :class:`run.CancelledError` is raised instead of
silently returning ``None`` when the task is cancelled or the app is shutting down.
"""
global process_pool # pylint: disable=global-statement # noqa: PLW0603

if process_pool is None:
raise RuntimeError('Process pool not set up.')

try:
return await _run(process_pool, safe_callback, callback, *args, **kwargs)
return await _run(process_pool, safe_callback, callback, *args, strict=strict, **kwargs)
except PicklingError as e:
if core.script_mode:
raise RuntimeError('Unable to run CPU-bound in script mode. Use a `@ui.page` function instead.') from e
Expand All @@ -99,9 +128,24 @@ async def cpu_bound(callback: Callable[P, R], *args: P.args, **kwargs: P.kwargs)
raise e


async def io_bound(callback: Callable[P, R], *args: P.args, **kwargs: P.kwargs) -> R:
"""Run an I/O-bound function in a separate thread."""
return await _run(thread_pool, callback, *args, **kwargs)
@overload
async def io_bound(callback: Callable[P, R], *args: P.args, strict: Literal[True],
**kwargs: P.kwargs) -> R: ...


@overload
async def io_bound(callback: Callable[P, R], *args: P.args, strict: Literal[False] = ...,
**kwargs: P.kwargs) -> R | None: ...


async def io_bound(callback: Callable[P, R], *args: P.args, strict: bool = False,
**kwargs: P.kwargs) -> R | None:
"""Run an I/O-bound function in a separate thread.

If *strict* is ``True``, a :class:`run.CancelledError` is raised instead of
silently returning ``None`` when the task is cancelled or the app is shutting down.
"""
return await _run(thread_pool, callback, *args, strict=strict, **kwargs)


def reset() -> None:
Expand Down
34 changes: 34 additions & 0 deletions tests/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pytest

from nicegui import app, run, ui
from nicegui.app.app import State
from nicegui.testing import User


Expand Down Expand Up @@ -93,3 +94,36 @@ async def index():

await user.open('/')
await user.should_see('excellent')


@pytest.mark.parametrize('func', [run.cpu_bound, run.io_bound])
async def test_returns_none_when_app_is_stopping(user: User, func: Callable):
@ui.page('/')
async def index():
original_state = app._state
app._state = State.STOPPING
try:
result = await func(delayed_hello)
ui.label(f'result={result}')
finally:
app._state = original_state

await user.open('/')
await user.should_see('result=None')


@pytest.mark.parametrize('func', [run.cpu_bound, run.io_bound])
async def test_strict_raises_when_app_is_stopping(user: User, func: Callable):
@ui.page('/')
async def index():
original_state = app._state
app._state = State.STOPPING
try:
with pytest.raises(run.CancelledError, match='app is stopping'):
await func(delayed_hello, strict=True)
ui.label('raised')
finally:
app._state = original_state

await user.open('/')
await user.should_see('raised')
Loading