Skip to content

Commit 7cfd82c

Browse files
committed
Fix race conditions, deadlocks, and add debug logging
- Fix TOCTOU race in async_callback_init() with mutex protection - Add 30s timeout to OWN_GIL dispatch functions to prevent deadlock - Add log_and_clear_python_error() helper for debugging - Document intentional leak-vs-crash tradeoff in destructors
1 parent d8fb49a commit 7cfd82c

File tree

5 files changed

+212
-23
lines changed

5 files changed

+212
-23
lines changed

c_src/py_callback.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2059,18 +2059,26 @@ static erlang_module_state_t *get_erlang_module_state(void) {
20592059
* Initialize async callback system for the current interpreter.
20602060
* Creates the response pipe and pending futures dict.
20612061
* Uses per-interpreter module state.
2062+
*
2063+
* Thread-safe: uses async_futures_mutex to prevent race conditions
2064+
* when multiple threads call this concurrently.
20622065
*/
20632066
static int async_callback_init(void) {
20642067
erlang_module_state_t *state = get_erlang_module_state();
20652068
if (state == NULL) {
20662069
return -1;
20672070
}
20682071

2072+
/* Lock to prevent TOCTOU race condition on pipe_initialized check */
2073+
pthread_mutex_lock(&state->async_futures_mutex);
2074+
20692075
if (state->pipe_initialized) {
2076+
pthread_mutex_unlock(&state->async_futures_mutex);
20702077
return 0; /* Already initialized for this interpreter */
20712078
}
20722079

20732080
if (pipe(state->async_callback_pipe) < 0) {
2081+
pthread_mutex_unlock(&state->async_futures_mutex);
20742082
return -1;
20752083
}
20762084

@@ -2086,10 +2094,12 @@ static int async_callback_init(void) {
20862094
close(state->async_callback_pipe[1]);
20872095
state->async_callback_pipe[0] = -1;
20882096
state->async_callback_pipe[1] = -1;
2097+
pthread_mutex_unlock(&state->async_futures_mutex);
20892098
return -1;
20902099
}
20912100

20922101
state->pipe_initialized = true;
2102+
pthread_mutex_unlock(&state->async_futures_mutex);
20932103
return 0;
20942104
}
20952105

c_src/py_event_loop.c

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,28 @@ int create_default_event_loop(ErlNifEnv *env);
352352

353353
/**
354354
* @brief Destructor for event loop resources
355+
*
356+
* Memory/Resource Management Note:
357+
* This destructor intentionally skips Python object cleanup (Py_DECREF) in
358+
* certain scenarios to avoid crashes:
359+
*
360+
* 1. Subinterpreter event loops (interp_id > 0): The subinterpreter may have
361+
* been destroyed by Py_EndInterpreter before this destructor runs (which
362+
* runs on the Erlang GC thread). Calling PyGILState_Ensure would crash.
363+
*
364+
* 2. Runtime shutdown: If runtime_is_running() returns false, Python is
365+
* shutting down or stopped. Calling Python C API would crash.
366+
*
367+
* 3. Thread state issues: If PyGILState_Check() returns true, we already
368+
* hold the GIL from somewhere else - calling PyGILState_Ensure would
369+
* deadlock or corrupt thread state.
370+
*
371+
* In all these cases, we accept a small memory leak (the Python objects)
372+
* rather than risking a crash. This is the standard Python embedding pattern
373+
* for destructor-time cleanup from non-Python threads.
374+
*
375+
* The leaked Python objects will be reclaimed when the Python runtime fully
376+
* shuts down via Py_FinalizeEx().
355377
*/
356378
void event_loop_destructor(ErlNifEnv *env, void *obj) {
357379
(void)env;
@@ -443,7 +465,10 @@ void event_loop_destructor(ErlNifEnv *env, void *obj) {
443465
loop->msg_env = NULL;
444466
}
445467

446-
/* Clean up per-process namespaces */
468+
/* Clean up per-process namespaces.
469+
* Note: Same leak-vs-crash tradeoff as above. If we can't safely
470+
* acquire the GIL, we skip Py_XDECREF and accept leaking the Python
471+
* dict objects. The native namespace struct is always freed. */
447472
pthread_mutex_lock(&loop->namespaces_mutex);
448473
process_namespace_t *ns = loop->namespaces_head;
449474
while (ns != NULL) {

c_src/py_nif.c

Lines changed: 124 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3262,7 +3262,8 @@ static void *owngil_context_thread_main(void *arg) {
32623262
PyDict_SetItemString(ctx->globals, "erlang", erlang_module);
32633263
Py_DECREF(erlang_module);
32643264
} else {
3265-
PyErr_Clear(); /* Non-fatal - basic operations still work */
3265+
/* Non-fatal - basic operations still work, but log for debugging */
3266+
log_and_clear_python_error("OWN_GIL erlang module import");
32663267
}
32673268

32683269
/* Release our OWN_GIL (we'll reacquire when processing requests) */
@@ -3323,12 +3324,21 @@ static void *owngil_context_thread_main(void *arg) {
33233324
return NULL;
33243325
}
33253326

3327+
/**
3328+
* Timeout for OWN_GIL dispatch in seconds.
3329+
* If worker thread doesn't respond within this time, assume it's dead.
3330+
*/
3331+
#define OWNGIL_DISPATCH_TIMEOUT_SECS 30
3332+
33263333
/**
33273334
* @brief Dispatch a request to the OWN_GIL thread and wait for response
33283335
*
33293336
* Called from dirty schedulers. Copies the request term to the shared env,
33303337
* signals the worker thread, and waits for the response.
33313338
*
3339+
* Uses pthread_cond_timedwait to prevent indefinite blocking if the worker
3340+
* thread dies or becomes unresponsive.
3341+
*
33323342
* @param env Caller's NIF environment
33333343
* @param ctx Context with OWN_GIL
33343344
* @param req_type Request type (CTX_REQ_CALL, CTX_REQ_EVAL, CTX_REQ_EXEC)
@@ -3355,9 +3365,21 @@ static ERL_NIF_TERM dispatch_to_owngil_thread(
33553365
/* Signal the worker thread */
33563366
pthread_cond_signal(&ctx->request_ready);
33573367

3358-
/* Wait for response */
3368+
/* Wait for response with timeout to prevent deadlock on worker death */
3369+
struct timespec deadline;
3370+
clock_gettime(CLOCK_REALTIME, &deadline);
3371+
deadline.tv_sec += OWNGIL_DISPATCH_TIMEOUT_SECS;
3372+
33593373
while (ctx->request_type != CTX_REQ_NONE) {
3360-
pthread_cond_wait(&ctx->response_ready, &ctx->request_mutex);
3374+
int rc = pthread_cond_timedwait(&ctx->response_ready, &ctx->request_mutex, &deadline);
3375+
if (rc == ETIMEDOUT) {
3376+
/* Worker thread is unresponsive - mark it as not running */
3377+
atomic_store(&ctx->thread_running, false);
3378+
pthread_mutex_unlock(&ctx->request_mutex);
3379+
fprintf(stderr, "OWN_GIL dispatch timeout: worker thread unresponsive after %d seconds\n",
3380+
OWNGIL_DISPATCH_TIMEOUT_SECS);
3381+
return make_error(env, "worker_timeout");
3382+
}
33613383
}
33623384

33633385
/* Copy response back to caller's env */
@@ -3372,6 +3394,7 @@ static ERL_NIF_TERM dispatch_to_owngil_thread(
33723394
* @brief Dispatch reactor on_read_ready to OWN_GIL thread
33733395
*
33743396
* Similar to dispatch_to_owngil_thread but also passes buffer pointer.
3397+
* Uses timeout to prevent deadlock if worker thread dies.
33753398
*/
33763399
ERL_NIF_TERM dispatch_reactor_read_to_owngil(ErlNifEnv *env, py_context_t *ctx,
33773400
int fd, void *buffer_ptr) {
@@ -3391,9 +3414,25 @@ ERL_NIF_TERM dispatch_reactor_read_to_owngil(ErlNifEnv *env, py_context_t *ctx,
33913414
/* Signal the worker thread */
33923415
pthread_cond_signal(&ctx->request_ready);
33933416

3394-
/* Wait for response */
3417+
/* Wait for response with timeout to prevent deadlock */
3418+
struct timespec deadline;
3419+
clock_gettime(CLOCK_REALTIME, &deadline);
3420+
deadline.tv_sec += OWNGIL_DISPATCH_TIMEOUT_SECS;
3421+
33953422
while (ctx->request_type != CTX_REQ_NONE) {
3396-
pthread_cond_wait(&ctx->response_ready, &ctx->request_mutex);
3423+
int rc = pthread_cond_timedwait(&ctx->response_ready, &ctx->request_mutex, &deadline);
3424+
if (rc == ETIMEDOUT) {
3425+
/* Worker thread is unresponsive - clean up buffer and mark dead */
3426+
atomic_store(&ctx->thread_running, false);
3427+
/* Buffer ownership was transferred but never processed - release it */
3428+
if (ctx->reactor_buffer_ptr) {
3429+
enif_release_resource(ctx->reactor_buffer_ptr);
3430+
ctx->reactor_buffer_ptr = NULL;
3431+
}
3432+
pthread_mutex_unlock(&ctx->request_mutex);
3433+
fprintf(stderr, "OWN_GIL reactor dispatch timeout: worker thread unresponsive\n");
3434+
return make_error(env, "worker_timeout");
3435+
}
33973436
}
33983437

33993438
/* Copy response back to caller's env */
@@ -3406,6 +3445,8 @@ ERL_NIF_TERM dispatch_reactor_read_to_owngil(ErlNifEnv *env, py_context_t *ctx,
34063445

34073446
/**
34083447
* @brief Dispatch reactor on_write_ready to OWN_GIL thread
3448+
*
3449+
* Uses timeout to prevent deadlock if worker thread dies.
34093450
*/
34103451
ERL_NIF_TERM dispatch_reactor_write_to_owngil(ErlNifEnv *env, py_context_t *ctx,
34113452
int fd) {
@@ -3423,9 +3464,19 @@ ERL_NIF_TERM dispatch_reactor_write_to_owngil(ErlNifEnv *env, py_context_t *ctx,
34233464
/* Signal the worker thread */
34243465
pthread_cond_signal(&ctx->request_ready);
34253466

3426-
/* Wait for response */
3467+
/* Wait for response with timeout to prevent deadlock */
3468+
struct timespec deadline;
3469+
clock_gettime(CLOCK_REALTIME, &deadline);
3470+
deadline.tv_sec += OWNGIL_DISPATCH_TIMEOUT_SECS;
3471+
34273472
while (ctx->request_type != CTX_REQ_NONE) {
3428-
pthread_cond_wait(&ctx->response_ready, &ctx->request_mutex);
3473+
int rc = pthread_cond_timedwait(&ctx->response_ready, &ctx->request_mutex, &deadline);
3474+
if (rc == ETIMEDOUT) {
3475+
atomic_store(&ctx->thread_running, false);
3476+
pthread_mutex_unlock(&ctx->request_mutex);
3477+
fprintf(stderr, "OWN_GIL reactor write dispatch timeout: worker thread unresponsive\n");
3478+
return make_error(env, "worker_timeout");
3479+
}
34293480
}
34303481

34313482
/* Copy response back to caller's env */
@@ -3438,6 +3489,8 @@ ERL_NIF_TERM dispatch_reactor_write_to_owngil(ErlNifEnv *env, py_context_t *ctx,
34383489

34393490
/**
34403491
* @brief Dispatch reactor init_connection to OWN_GIL thread
3492+
*
3493+
* Uses timeout to prevent deadlock if worker thread dies.
34413494
*/
34423495
ERL_NIF_TERM dispatch_reactor_init_to_owngil(ErlNifEnv *env, py_context_t *ctx,
34433496
int fd, ERL_NIF_TERM client_info) {
@@ -3457,9 +3510,19 @@ ERL_NIF_TERM dispatch_reactor_init_to_owngil(ErlNifEnv *env, py_context_t *ctx,
34573510
/* Signal the worker thread */
34583511
pthread_cond_signal(&ctx->request_ready);
34593512

3460-
/* Wait for response */
3513+
/* Wait for response with timeout to prevent deadlock */
3514+
struct timespec deadline;
3515+
clock_gettime(CLOCK_REALTIME, &deadline);
3516+
deadline.tv_sec += OWNGIL_DISPATCH_TIMEOUT_SECS;
3517+
34613518
while (ctx->request_type != CTX_REQ_NONE) {
3462-
pthread_cond_wait(&ctx->response_ready, &ctx->request_mutex);
3519+
int rc = pthread_cond_timedwait(&ctx->response_ready, &ctx->request_mutex, &deadline);
3520+
if (rc == ETIMEDOUT) {
3521+
atomic_store(&ctx->thread_running, false);
3522+
pthread_mutex_unlock(&ctx->request_mutex);
3523+
fprintf(stderr, "OWN_GIL reactor init dispatch timeout: worker thread unresponsive\n");
3524+
return make_error(env, "worker_timeout");
3525+
}
34633526
}
34643527

34653528
/* Copy response back to caller's env */
@@ -3474,6 +3537,7 @@ ERL_NIF_TERM dispatch_reactor_init_to_owngil(ErlNifEnv *env, py_context_t *ctx,
34743537
* @brief Dispatch exec_with_env to OWN_GIL thread
34753538
*
34763539
* Passes the process-local env resource to the worker thread via local_env_ptr.
3540+
* Uses timeout to prevent deadlock if worker thread dies.
34773541
*/
34783542
static ERL_NIF_TERM dispatch_exec_with_env_to_owngil(
34793543
ErlNifEnv *env, py_context_t *ctx,
@@ -3494,9 +3558,19 @@ static ERL_NIF_TERM dispatch_exec_with_env_to_owngil(
34943558
/* Signal the worker thread */
34953559
pthread_cond_signal(&ctx->request_ready);
34963560

3497-
/* Wait for response */
3561+
/* Wait for response with timeout to prevent deadlock */
3562+
struct timespec deadline;
3563+
clock_gettime(CLOCK_REALTIME, &deadline);
3564+
deadline.tv_sec += OWNGIL_DISPATCH_TIMEOUT_SECS;
3565+
34983566
while (ctx->request_type != CTX_REQ_NONE) {
3499-
pthread_cond_wait(&ctx->response_ready, &ctx->request_mutex);
3567+
int rc = pthread_cond_timedwait(&ctx->response_ready, &ctx->request_mutex, &deadline);
3568+
if (rc == ETIMEDOUT) {
3569+
atomic_store(&ctx->thread_running, false);
3570+
pthread_mutex_unlock(&ctx->request_mutex);
3571+
fprintf(stderr, "OWN_GIL exec_with_env dispatch timeout: worker thread unresponsive\n");
3572+
return make_error(env, "worker_timeout");
3573+
}
35003574
}
35013575

35023576
/* Copy response back to caller's env */
@@ -3511,6 +3585,7 @@ static ERL_NIF_TERM dispatch_exec_with_env_to_owngil(
35113585
* @brief Dispatch eval_with_env to OWN_GIL thread
35123586
*
35133587
* Passes the process-local env resource to the worker thread via local_env_ptr.
3588+
* Uses timeout to prevent deadlock if worker thread dies.
35143589
*/
35153590
static ERL_NIF_TERM dispatch_eval_with_env_to_owngil(
35163591
ErlNifEnv *env, py_context_t *ctx,
@@ -3534,9 +3609,19 @@ static ERL_NIF_TERM dispatch_eval_with_env_to_owngil(
35343609
/* Signal the worker thread */
35353610
pthread_cond_signal(&ctx->request_ready);
35363611

3537-
/* Wait for response */
3612+
/* Wait for response with timeout to prevent deadlock */
3613+
struct timespec deadline;
3614+
clock_gettime(CLOCK_REALTIME, &deadline);
3615+
deadline.tv_sec += OWNGIL_DISPATCH_TIMEOUT_SECS;
3616+
35383617
while (ctx->request_type != CTX_REQ_NONE) {
3539-
pthread_cond_wait(&ctx->response_ready, &ctx->request_mutex);
3618+
int rc = pthread_cond_timedwait(&ctx->response_ready, &ctx->request_mutex, &deadline);
3619+
if (rc == ETIMEDOUT) {
3620+
atomic_store(&ctx->thread_running, false);
3621+
pthread_mutex_unlock(&ctx->request_mutex);
3622+
fprintf(stderr, "OWN_GIL eval_with_env dispatch timeout: worker thread unresponsive\n");
3623+
return make_error(env, "worker_timeout");
3624+
}
35403625
}
35413626

35423627
/* Copy response back to caller's env */
@@ -3551,6 +3636,7 @@ static ERL_NIF_TERM dispatch_eval_with_env_to_owngil(
35513636
* @brief Dispatch call_with_env to OWN_GIL thread
35523637
*
35533638
* Passes the process-local env resource to the worker thread via local_env_ptr.
3639+
* Uses timeout to prevent deadlock if worker thread dies.
35543640
*/
35553641
static ERL_NIF_TERM dispatch_call_with_env_to_owngil(
35563642
ErlNifEnv *env, py_context_t *ctx,
@@ -3578,9 +3664,19 @@ static ERL_NIF_TERM dispatch_call_with_env_to_owngil(
35783664
/* Signal the worker thread */
35793665
pthread_cond_signal(&ctx->request_ready);
35803666

3581-
/* Wait for response */
3667+
/* Wait for response with timeout to prevent deadlock */
3668+
struct timespec deadline;
3669+
clock_gettime(CLOCK_REALTIME, &deadline);
3670+
deadline.tv_sec += OWNGIL_DISPATCH_TIMEOUT_SECS;
3671+
35823672
while (ctx->request_type != CTX_REQ_NONE) {
3583-
pthread_cond_wait(&ctx->response_ready, &ctx->request_mutex);
3673+
int rc = pthread_cond_timedwait(&ctx->response_ready, &ctx->request_mutex, &deadline);
3674+
if (rc == ETIMEDOUT) {
3675+
atomic_store(&ctx->thread_running, false);
3676+
pthread_mutex_unlock(&ctx->request_mutex);
3677+
fprintf(stderr, "OWN_GIL call_with_env dispatch timeout: worker thread unresponsive\n");
3678+
return make_error(env, "worker_timeout");
3679+
}
35843680
}
35853681

35863682
/* Copy response back to caller's env */
@@ -3596,6 +3692,7 @@ static ERL_NIF_TERM dispatch_call_with_env_to_owngil(
35963692
*
35973693
* Creates the globals/locals dicts in the correct interpreter context.
35983694
* Returns ok or error.
3695+
* Uses timeout to prevent deadlock if worker thread dies.
35993696
*/
36003697
static ERL_NIF_TERM dispatch_create_local_env_to_owngil(
36013698
ErlNifEnv *env, py_context_t *ctx,
@@ -3615,9 +3712,19 @@ static ERL_NIF_TERM dispatch_create_local_env_to_owngil(
36153712
/* Signal the worker thread */
36163713
pthread_cond_signal(&ctx->request_ready);
36173714

3618-
/* Wait for response */
3715+
/* Wait for response with timeout to prevent deadlock */
3716+
struct timespec deadline;
3717+
clock_gettime(CLOCK_REALTIME, &deadline);
3718+
deadline.tv_sec += OWNGIL_DISPATCH_TIMEOUT_SECS;
3719+
36193720
while (ctx->request_type != CTX_REQ_NONE) {
3620-
pthread_cond_wait(&ctx->response_ready, &ctx->request_mutex);
3721+
int rc = pthread_cond_timedwait(&ctx->response_ready, &ctx->request_mutex, &deadline);
3722+
if (rc == ETIMEDOUT) {
3723+
atomic_store(&ctx->thread_running, false);
3724+
pthread_mutex_unlock(&ctx->request_mutex);
3725+
fprintf(stderr, "OWN_GIL create_local_env dispatch timeout: worker thread unresponsive\n");
3726+
return make_error(env, "worker_timeout");
3727+
}
36213728
}
36223729

36233730
/* Copy response back to caller's env */

0 commit comments

Comments
 (0)