Bug summary
Affected page
https://docs.prefect.io/integrations/prefect-dask#with-prefect
Summary
The docs suggest using dask.annotate(priority=...) to influence task execution order when running with Prefect's DaskTaskRunner. In Prefect 3.x, this has no effect because Prefect's Dask client submits tasks to Dask with an explicit priority=0, which overrides any annotation context. As a result, annotated priorities are ignored and users see no change in scheduling order.
Reproduction (minimal)
from prefect import flow, task
from prefect_dask import DaskTaskRunner
import dask
@task
def bar(x):
return len(x)
@flow(
task_runner=DaskTaskRunner(
cluster_kwargs={"n_workers": 2, "threads_per_worker": 1, "memory_limit": "2GiB", "silence_logs": False}
),
log_prints=True
)
def foo(a):
futures = []
for x in a:
with dask.annotate(priority=len(x)):
futures.append(bar.submit(x=x))
return [f.result() for f in futures]
if __name__ == "__main__":
# Expectation from docs: longer strings (higher priority) should run first
print(foo(["a", "bbbbbbbb", "cc", "dddd", "eee", "fffffffff", "g"]))
Expected (per docs): higher-priority tasks start first
Actual: no observable prioritization; tasks are scheduled as workers free up
Expected behavior
Annotated tasks with higher priority should be scheduled ahead of lower-priority tasks.
Actual behavior
No prioritization effect observed with DaskTaskRunner; all tasks appear to run with the same priority.
Why this happens (current behavior)
DaskTaskRunner.submit(...) does not expose a priority argument.
PrefectDaskClient.submit(...) passes an explicit priority keyword (default 0) to distributed.Client.submit(...).
- In Dask, an explicit
priority= kwarg overrides any annotate(...) context, so annotations do not apply here.
Proposed doc changes
- Update https://docs.prefect.io/integrations/prefect-dask#with-prefect to state that
dask.annotate(priority=...) (and even distributed.annotate) will not affect task priority when using Prefect's DaskTaskRunner in 3.x.
- Provide workarounds:
- Use dependency gating with
wait_for for strict ordering.
- Stage submissions (enqueue high-priority batch first, then others) for soft prioritization.
- Optionally note a future enhancement: expose a
priority parameter in Task.submit(...) and/or DaskTaskRunner.submit(...) that is forwarded to Dask.
Workarounds today
- Submit higher-priority tasks first; submit the rest after (or gate them with
wait_for).
- For strict order:
low = [bar.submit(x, wait_for=high)].
Related links
Version info
Version: 3.4.25
API version: 0.8.4
Python version: 3.11.9
Git commit: 8a37e7b1
Built: Thu, Oct 23, 2025 07:58 PM
OS/Arch: linux/x86_64
Profile: ephemeral
Server type: cloud
Pydantic version: 2.11.7
Server:
Database: sqlite
SQLite version: 3.34.1
Integrations:
prefect-dask: 0.3.6
prefect-slack: 0.3.1
Additional context
No response
Bug summary
Affected page
https://docs.prefect.io/integrations/prefect-dask#with-prefect
Summary
The docs suggest using
dask.annotate(priority=...)to influence task execution order when running with Prefect's DaskTaskRunner. In Prefect 3.x, this has no effect because Prefect's Dask client submits tasks to Dask with an explicitpriority=0, which overrides any annotation context. As a result, annotated priorities are ignored and users see no change in scheduling order.Reproduction (minimal)
Expected (per docs): higher-priority tasks start first
Actual: no observable prioritization; tasks are scheduled as workers free up
Expected behavior
Annotated tasks with higher
priorityshould be scheduled ahead of lower-priority tasks.Actual behavior
No prioritization effect observed with DaskTaskRunner; all tasks appear to run with the same priority.
Why this happens (current behavior)
DaskTaskRunner.submit(...)does not expose apriorityargument.PrefectDaskClient.submit(...)passes an explicitprioritykeyword (default 0) todistributed.Client.submit(...).priority=kwarg overrides anyannotate(...)context, so annotations do not apply here.Proposed doc changes
dask.annotate(priority=...)(and evendistributed.annotate) will not affect task priority when using Prefect's DaskTaskRunner in 3.x.wait_forfor strict ordering.priorityparameter inTask.submit(...)and/orDaskTaskRunner.submit(...)that is forwarded to Dask.Workarounds today
wait_for).low = [bar.submit(x, wait_for=high)].Related links
Version info
Additional context
No response