Bug summary
When a flow using a custom Serializer subclass fails, the worker crashes with a KeyError and cannot mark the flow run as CRASHED, leaving a confusing error message for users investigating the crash.
Minimal reproducible example
my_serializer.py
import json
from typing import Any, Literal
from prefect.serializers import Serializer
class MyCustomSerializer(Serializer):
type: Literal["my_custom_serializer"] = "my_custom_serializer"
def dumps(self, obj: Any) -> bytes:
return json.dumps(obj).encode()
def loads(self, blob: bytes) -> Any:
return json.loads(blob.decode())
my_flow.py
from prefect import flow
from my_serializer import MyCustomSerializer
@flow(persist_result=True, result_serializer=MyCustomSerializer())
def failing_flow():
raise RuntimeError("intentional failure")
prefect work-pool create my-pool --type process
# deploy failing_flow to my-pool, then:
prefect worker start --pool my-pool &
prefect deployment run failing-flow/test
Stack traces
Two errors fire simultaneously, crashing the worker process entirely:
1. Worker cannot propose CRASHED state:
Failed to update state of flow run '...'
Traceback (most recent call last):
File ".../prefect/runner/_state_proposer.py", line 115, in propose_crashed
state = await propose_state(
File ".../prefect/utilities/engine.py", line 390, in propose_state
response = await set_state_and_handle_waits(set_state)
File ".../prefect/client/orchestration/_flow_runs/client.py", line 914, in set_flow_run_state
result: OrchestrationResult[T] = OrchestrationResult.model_validate(
File ".../prefect/serializers.py", line 131, in __new__
subcls = lookup_type(cls, dispatch_key=type_)
File ".../prefect/utilities/dispatch.py", line 208, in lookup_type
raise KeyError(
KeyError: "No class found for dispatch key 'my_custom_serializer' in registry for type 'Serializer'."
During handling of the above exception, another exception occurred:
KeyError: 'Invalid error type: \'"No class found for dispatch key \'my_custom_serializer\' in registry for type \'Serializer\'."\''
2. Worker cannot read the completed flow run (for on_crashed hook check):
File ".../prefect/runner/runner.py", line 1278, in _submit_run_and_capture_errors
api_flow_run = await self._client.read_flow_run(flow_run_id=flow_run.id)
File ".../prefect/client/orchestration/_flow_runs/client.py", line 728, in read_flow_run
return FlowRun.model_validate(response.json())
File ".../prefect/serializers.py", line 133, in __new__
raise ValidationError.from_exception_data(
KeyError: 'Invalid error type: \'"No class found for dispatch key \'my_custom_serializer\' in registry for type \'Serializer\'."\''
Version info
Version: 3.6.26
Python version: 3.12
Additional context
The flow and the worker run in separate processes. The flow process imports user code and registers MyCustomSerializer. The worker is infrastructure — it never imports user code and has no mechanism to know which serializer classes a given flow uses.
The fix should be in lookup_type: return a safe opaque placeholder instead of raising when a serializer key is not found. The worker has no need to call .dumps() or .loads() — it only needs to handle state transitions. A RuntimeError should only surface if code actually attempts to deserialize a result.
Bug summary
When a flow using a custom
Serializersubclass fails, the worker crashes with aKeyErrorand cannot mark the flow run asCRASHED, leaving a confusing error message for users investigating the crash.Minimal reproducible example
my_serializer.pymy_flow.pyStack traces
Two errors fire simultaneously, crashing the worker process entirely:
1. Worker cannot propose CRASHED state:
2. Worker cannot read the completed flow run (for
on_crashedhook check):Version info
Additional context
The flow and the worker run in separate processes. The flow process imports user code and registers
MyCustomSerializer. The worker is infrastructure — it never imports user code and has no mechanism to know which serializer classes a given flow uses.The fix should be in
lookup_type: return a safe opaque placeholder instead of raising when a serializer key is not found. The worker has no need to call.dumps()or.loads()— it only needs to handle state transitions. ARuntimeErrorshould only surface if code actually attempts to deserialize a result.