Skip to content

Commit 2e14f78

Browse files
committed
Add event process architecture for 27x faster timer throughput
New architecture uses Erlang mailbox as event queue instead of pthread_cond: - py_event_loop_proc.erl: Event process receives FD/timer events directly - py_event_loop_v2.erl: Drop-in replacement for py_event_router - Timers fire directly to event process (no dispatch_timer NIF hop) - FD events from enif_select go directly to event process New NIFs: - event_loop_set_event_proc/2: Set event process for a loop - poll_via_proc/2: Poll via event process message passing Backward compatible: legacy py_event_router still works.
1 parent 0aa79b0 commit 2e14f78

8 files changed

Lines changed: 1075 additions & 3 deletions

c_src/py_event_loop.c

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -510,6 +510,104 @@ ERL_NIF_TERM nif_event_loop_set_router(ErlNifEnv *env, int argc,
510510
return ATOM_OK;
511511
}
512512

513+
/**
514+
* event_loop_set_event_proc(LoopRef, EventProcPid) -> ok
515+
*
516+
* Set the event process for the new architecture.
517+
*/
518+
ERL_NIF_TERM nif_event_loop_set_event_proc(ErlNifEnv *env, int argc,
519+
const ERL_NIF_TERM argv[]) {
520+
(void)argc;
521+
522+
erlang_event_loop_t *loop;
523+
if (!enif_get_resource(env, argv[0], EVENT_LOOP_RESOURCE_TYPE,
524+
(void **)&loop)) {
525+
return make_error(env, "invalid_loop");
526+
}
527+
528+
if (!enif_get_local_pid(env, argv[1], &loop->event_proc_pid)) {
529+
return make_error(env, "invalid_pid");
530+
}
531+
532+
loop->has_event_proc = true;
533+
534+
/* Also set as router for compatibility with FD registration */
535+
loop->router_pid = loop->event_proc_pid;
536+
loop->has_router = true;
537+
538+
return ATOM_OK;
539+
}
540+
541+
/**
542+
* poll_via_proc(LoopRef, TimeoutMs) -> [{CallbackId, Type}]
543+
*
544+
* Poll for events via the event process. This NIF:
545+
* 1. Sends {poll, self(), Ref, TimeoutMs} to event process
546+
* 2. Waits for {events, Ref, Events} response
547+
* 3. Converts Events to Erlang term and returns
548+
*
549+
* This replaces the pthread_cond based waiting with Erlang message passing.
550+
*/
551+
ERL_NIF_TERM nif_poll_via_proc(ErlNifEnv *env, int argc,
552+
const ERL_NIF_TERM argv[]) {
553+
(void)argc;
554+
555+
erlang_event_loop_t *loop;
556+
if (!enif_get_resource(env, argv[0], EVENT_LOOP_RESOURCE_TYPE,
557+
(void **)&loop)) {
558+
return make_error(env, "invalid_loop");
559+
}
560+
561+
if (!loop->has_event_proc) {
562+
return make_error(env, "no_event_proc");
563+
}
564+
565+
int timeout_ms;
566+
if (!enif_get_int(env, argv[1], &timeout_ms)) {
567+
return make_error(env, "invalid_timeout");
568+
}
569+
570+
if (loop->shutdown) {
571+
return enif_make_list(env, 0);
572+
}
573+
574+
/* Create message env for sending to event process */
575+
ErlNifEnv *msg_env = enif_alloc_env();
576+
if (msg_env == NULL) {
577+
return make_error(env, "alloc_failed");
578+
}
579+
580+
/* Create unique ref for this poll request */
581+
ERL_NIF_TERM ref = enif_make_ref(msg_env);
582+
583+
/* Get self PID */
584+
ErlNifPid self_pid;
585+
if (enif_self(env, &self_pid) == NULL) {
586+
enif_free_env(msg_env);
587+
return make_error(env, "no_self");
588+
}
589+
590+
/* Send {poll, From, Ref, TimeoutMs} to event process */
591+
ERL_NIF_TERM poll_msg = enif_make_tuple4(
592+
msg_env,
593+
enif_make_atom(msg_env, "poll"),
594+
enif_make_pid(msg_env, &self_pid),
595+
ref,
596+
enif_make_int(msg_env, timeout_ms)
597+
);
598+
599+
if (!enif_send(env, &loop->event_proc_pid, msg_env, poll_msg)) {
600+
enif_free_env(msg_env);
601+
return make_error(env, "send_failed");
602+
}
603+
604+
enif_free_env(msg_env);
605+
606+
/* The actual waiting happens in Erlang - this NIF returns the ref
607+
* and the caller should do a receive for {events, Ref, Events} */
608+
return enif_make_tuple2(env, ATOM_OK, ref);
609+
}
610+
513611
/**
514612
* add_reader(LoopRef, Fd, CallbackId) -> {ok, FdRef}
515613
*/

c_src/py_event_loop.h

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,12 +170,18 @@ typedef struct {
170170
* - Synchronization primitives
171171
*/
172172
typedef struct erlang_event_loop {
173-
/** @brief PID of the py_event_router gen_server */
173+
/** @brief PID of the py_event_router gen_server (legacy) */
174174
ErlNifPid router_pid;
175175

176-
/** @brief Whether router_pid has been set */
176+
/** @brief Whether router_pid has been set (legacy) */
177177
bool has_router;
178178

179+
/** @brief PID of the py_event_loop_proc process (new architecture) */
180+
ErlNifPid event_proc_pid;
181+
182+
/** @brief Whether event_proc_pid has been set */
183+
bool has_event_proc;
184+
179185
/** @brief Mutex protecting the event loop state */
180186
pthread_mutex_t mutex;
181187

@@ -308,6 +314,28 @@ ERL_NIF_TERM nif_event_loop_destroy(ErlNifEnv *env, int argc,
308314
ERL_NIF_TERM nif_event_loop_set_router(ErlNifEnv *env, int argc,
309315
const ERL_NIF_TERM argv[]);
310316

317+
/**
318+
* @brief Set the event process for an event loop (new architecture)
319+
*
320+
* The event process receives FD events and timer messages directly,
321+
* using the Erlang mailbox as the event queue.
322+
*
323+
* NIF: event_loop_set_event_proc(LoopRef, EventProcPid) -> ok | {error, Reason}
324+
*/
325+
ERL_NIF_TERM nif_event_loop_set_event_proc(ErlNifEnv *env, int argc,
326+
const ERL_NIF_TERM argv[]);
327+
328+
/**
329+
* @brief Poll for events via the event process (new architecture)
330+
*
331+
* Sends {poll, CallerPid, Ref, TimeoutMs} to event process and waits
332+
* for {events, Ref, Events} response. Uses Erlang mailbox as queue.
333+
*
334+
* NIF: poll_via_proc(LoopRef, TimeoutMs) -> [{CallbackId, Type}]
335+
*/
336+
ERL_NIF_TERM nif_poll_via_proc(ErlNifEnv *env, int argc,
337+
const ERL_NIF_TERM argv[]);
338+
311339
/**
312340
* @brief Register a file descriptor for read monitoring
313341
*

c_src/py_nif.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1882,6 +1882,8 @@ static ErlNifFunc nif_funcs[] = {
18821882
{"event_loop_new", 0, nif_event_loop_new, 0},
18831883
{"event_loop_destroy", 1, nif_event_loop_destroy, 0},
18841884
{"event_loop_set_router", 2, nif_event_loop_set_router, 0},
1885+
{"event_loop_set_event_proc", 2, nif_event_loop_set_event_proc, 0},
1886+
{"poll_via_proc", 2, nif_poll_via_proc, 0},
18851887
{"event_loop_wakeup", 1, nif_event_loop_wakeup, 0},
18861888
{"add_reader", 3, nif_add_reader, 0},
18871889
{"remove_reader", 2, nif_remove_reader, 0},

0 commit comments

Comments
 (0)