Config
application:set_env(ekaf, ekaf_partition_strategy, [{ekaf_partition_strategy, custom}])
All messages are produced with a key.
What happens:
- ekaf_server.erl:439
handle_info({worker, up, WorkerUp, WorkerUpStateName, WorkerUpState, _}, StateName, #ekaf_server { topic = Topic, messages = OfflineMessages } = State)
- ekaf_server_lib.erl:118
send_messages(StateName, #ekaf_server{ topic = Topic } = State, Messages)
- ekaf.erl:66
produce_async_batched(Topic, Data)
- ekaf_lib.erl:79
common_async(Event, Topic, [{Key,Data}|Rest])
The code waits for a reply from the TopicWorker, but the current process is the topic worker. The code would deadlock if we used gen_fsm:sync_send_all_state_event as in #50 . Here instead, we send {pick, {Key,Data}, self()} to self(), immediately receive it in clause _E and the message is discarded.
Config
application:set_env(ekaf, ekaf_partition_strategy, [{ekaf_partition_strategy, custom}])All messages are produced with a key.
What happens:
handle_info({worker, up, WorkerUp, WorkerUpStateName, WorkerUpState, _}, StateName, #ekaf_server { topic = Topic, messages = OfflineMessages } = State)send_messages(StateName, #ekaf_server{ topic = Topic } = State, Messages)produce_async_batched(Topic, Data)common_async(Event, Topic, [{Key,Data}|Rest])The code waits for a reply from the
TopicWorker, but the current process is the topic worker. The code would deadlock if we usedgen_fsm:sync_send_all_state_eventas in #50 . Here instead, we send{pick, {Key,Data}, self()}toself(), immediatelyreceiveit in clause_Eand the message is discarded.