Skip to content

dask.annotate(priority=...) has no effect with DaskTaskRunner in Prefect 3.x #21608

@m-lal

Description

@m-lal

Bug summary

Affected page

https://docs.prefect.io/integrations/prefect-dask#with-prefect

Summary

The docs suggest using dask.annotate(priority=...) to influence task execution order when running with Prefect's DaskTaskRunner. In Prefect 3.x, this has no effect because Prefect's Dask client submits tasks to Dask with an explicit priority=0, which overrides any annotation context. As a result, annotated priorities are ignored and users see no change in scheduling order.

Reproduction (minimal)

from prefect import flow, task
from prefect_dask import DaskTaskRunner
import dask

@task
def bar(x):
    return len(x)

@flow(
    task_runner=DaskTaskRunner(
        cluster_kwargs={"n_workers": 2, "threads_per_worker": 1, "memory_limit": "2GiB", "silence_logs": False}
    ),
    log_prints=True
)
def foo(a):
    futures = []
    for x in a:
        with dask.annotate(priority=len(x)):
            futures.append(bar.submit(x=x))
    return [f.result() for f in futures]

if __name__ == "__main__":
    # Expectation from docs: longer strings (higher priority) should run first
    print(foo(["a", "bbbbbbbb", "cc", "dddd", "eee", "fffffffff", "g"]))

Expected (per docs): higher-priority tasks start first

Actual: no observable prioritization; tasks are scheduled as workers free up

Expected behavior

Annotated tasks with higher priority should be scheduled ahead of lower-priority tasks.

Actual behavior

No prioritization effect observed with DaskTaskRunner; all tasks appear to run with the same priority.

Why this happens (current behavior)

  • DaskTaskRunner.submit(...) does not expose a priority argument.
  • PrefectDaskClient.submit(...) passes an explicit priority keyword (default 0) to distributed.Client.submit(...).
  • In Dask, an explicit priority= kwarg overrides any annotate(...) context, so annotations do not apply here.

Proposed doc changes

  • Update https://docs.prefect.io/integrations/prefect-dask#with-prefect to state that dask.annotate(priority=...) (and even distributed.annotate) will not affect task priority when using Prefect's DaskTaskRunner in 3.x.
  • Provide workarounds:
    • Use dependency gating with wait_for for strict ordering.
    • Stage submissions (enqueue high-priority batch first, then others) for soft prioritization.
  • Optionally note a future enhancement: expose a priority parameter in Task.submit(...) and/or DaskTaskRunner.submit(...) that is forwarded to Dask.

Workarounds today

  • Submit higher-priority tasks first; submit the rest after (or gate them with wait_for).
  • For strict order: low = [bar.submit(x, wait_for=high)].

Related links

Version info

Version:              3.4.25
API version:          0.8.4
Python version:       3.11.9
Git commit:           8a37e7b1
Built:                Thu, Oct 23, 2025 07:58 PM
OS/Arch:              linux/x86_64
Profile:              ephemeral
Server type:          cloud
Pydantic version:     2.11.7
Server:
  Database:           sqlite
  SQLite version:     3.34.1
Integrations:
  prefect-dask:       0.3.6
  prefect-slack:      0.3.1

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions