Skip to content

Worker crashes with KeyError on custom serializer when proposing CRASHED state after flow failure #21609

@mikeoconnor0308

Description

@mikeoconnor0308

Bug summary

When a flow using a custom Serializer subclass fails, the worker crashes with a KeyError and cannot mark the flow run as CRASHED, leaving a confusing error message for users investigating the crash.

Minimal reproducible example

my_serializer.py

import json
from typing import Any, Literal
from prefect.serializers import Serializer

class MyCustomSerializer(Serializer):
    type: Literal["my_custom_serializer"] = "my_custom_serializer"

    def dumps(self, obj: Any) -> bytes:
        return json.dumps(obj).encode()

    def loads(self, blob: bytes) -> Any:
        return json.loads(blob.decode())

my_flow.py

from prefect import flow
from my_serializer import MyCustomSerializer

@flow(persist_result=True, result_serializer=MyCustomSerializer())
def failing_flow():
    raise RuntimeError("intentional failure")
prefect work-pool create my-pool --type process
# deploy failing_flow to my-pool, then:
prefect worker start --pool my-pool &
prefect deployment run failing-flow/test

Stack traces

Two errors fire simultaneously, crashing the worker process entirely:

1. Worker cannot propose CRASHED state:

Failed to update state of flow run '...'
Traceback (most recent call last):
  File ".../prefect/runner/_state_proposer.py", line 115, in propose_crashed
    state = await propose_state(
  File ".../prefect/utilities/engine.py", line 390, in propose_state
    response = await set_state_and_handle_waits(set_state)
  File ".../prefect/client/orchestration/_flow_runs/client.py", line 914, in set_flow_run_state
    result: OrchestrationResult[T] = OrchestrationResult.model_validate(
  File ".../prefect/serializers.py", line 131, in __new__
    subcls = lookup_type(cls, dispatch_key=type_)
  File ".../prefect/utilities/dispatch.py", line 208, in lookup_type
    raise KeyError(
KeyError: "No class found for dispatch key 'my_custom_serializer' in registry for type 'Serializer'."

During handling of the above exception, another exception occurred:

KeyError: 'Invalid error type: \'"No class found for dispatch key \'my_custom_serializer\' in registry for type \'Serializer\'."\''

2. Worker cannot read the completed flow run (for on_crashed hook check):

  File ".../prefect/runner/runner.py", line 1278, in _submit_run_and_capture_errors
    api_flow_run = await self._client.read_flow_run(flow_run_id=flow_run.id)
  File ".../prefect/client/orchestration/_flow_runs/client.py", line 728, in read_flow_run
    return FlowRun.model_validate(response.json())
  File ".../prefect/serializers.py", line 133, in __new__
    raise ValidationError.from_exception_data(
KeyError: 'Invalid error type: \'"No class found for dispatch key \'my_custom_serializer\' in registry for type \'Serializer\'."\''

Version info

Version:             3.6.26
Python version:      3.12

Additional context

The flow and the worker run in separate processes. The flow process imports user code and registers MyCustomSerializer. The worker is infrastructure — it never imports user code and has no mechanism to know which serializer classes a given flow uses.

The fix should be in lookup_type: return a safe opaque placeholder instead of raising when a serializer key is not found. The worker has no need to call .dumps() or .loads() — it only needs to handle state transitions. A RuntimeError should only surface if code actually attempts to deserialize a result.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions