Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions pjlib/include/pj/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,21 @@
# define PJ_IOQUEUE_CALLBACK_NO_LOCK 1
#endif

/**
* Enable the ioqueue "fast track" optimization. When enabled (default),
* pj_ioqueue_send(), pj_ioqueue_sendto(), and pj_ioqueue_accept() attempt
* the operation immediately before queuing it for async processing.
*
* This should not be disabled in production. Setting to 0 forces all
* operations through the async path, intended only for debugging and
* testing the ioqueue completion callback logic. See #4864, #4878.
*
* Default: 1 (enabled).
*/
#ifndef PJ_IOQUEUE_FAST_TRACK
# define PJ_IOQUEUE_FAST_TRACK 1
#endif


/**
* Determine if FD_SETSIZE is changeable/set-able. If so, then we will
Expand Down
14 changes: 10 additions & 4 deletions pjlib/src/pj/activesock.c
Original file line number Diff line number Diff line change
Expand Up @@ -773,12 +773,18 @@ static void ioqueue_on_write_complete(pj_ioqueue_key_t *key,

asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);

/* Ignore if we've been shutdown. This may cause data to be partially
* sent even when 'wholedata' was requested if the OS only sent partial
* buffer.
/* If we've been shutdown, still invoke the callback so upper layers
* can release resources (e.g., pjsip_tx_data_dec_ref). See #4878.
*
* Note: asock->key may be NULL at this point (cleared by
* pj_activesock_close before pj_ioqueue_unregister). The callback
* must not attempt further I/O on this active socket.
*/
if (asock->shutdown & SHUT_TX)
if (asock->shutdown & SHUT_TX) {
if (asock->cb.on_data_sent)
(*asock->cb.on_data_sent)(asock, op_key, bytes_sent);
return;
}

if (bytes_sent > 0 && op_key->activesock_data) {
/* whole_data is requested. Make sure we send all the data */
Expand Down
118 changes: 114 additions & 4 deletions pjlib/src/pj/ioqueue_common_abs.c
Original file line number Diff line number Diff line change
Expand Up @@ -443,9 +443,12 @@ static pj_bool_t ioqueue_dispatch_write_event( pj_ioqueue_t *ioqueue,
#endif
}

/* Call callback. */
if (h->cb.on_write_complete && !IS_CLOSING(h)) {
(*h->cb.on_write_complete)(h,
/* Call callback. Do not skip when IS_CLOSING, as upper
* layers need the callback to release resources (e.g.,
* pjsip_tx_data_dec_ref). See #4878.
*/
if (h->cb.on_write_complete) {
(*h->cb.on_write_complete)(h,
(pj_ioqueue_op_key_t*)write_op,
write_op->written);
}
Expand Down Expand Up @@ -538,7 +541,7 @@ static unsigned ioqueue_dispatch_write_event_no_lock(pj_ioqueue_key_t* h,
/* Check if there is any pending write callback for this key. */
pj_ioqueue_lock_key(h);

if (!IS_CLOSING(h) && !pj_list_empty(&h->write_cb_list) &&
if (!pj_list_empty(&h->write_cb_list) &&
(max_event == 0 || event_cnt < max_event))
{
write_op = h->write_cb_list.next;
Expand Down Expand Up @@ -569,6 +572,107 @@ static unsigned ioqueue_dispatch_write_event_no_lock(pj_ioqueue_key_t* h,
}
#endif

/*
* Drain pending write callbacks during key unregistration.
*
* This ensures upper layers (e.g., SIP transport) receive on_write_complete
* callbacks for all pending writes, allowing them to release resources such
* as tdata references. Without this, pending writes are silently discarded
* when a key is unregistered, causing reference leaks. See #4864, #4878.
*
* Two lists must be drained in order:
* 1. write_cb_list (completed ops, deferred callback) - with op->written
* 2. write_list (pending ops, never sent) - with -PJ_ECANCELLED
*
* Must be called after key->closing is set and key is unlocked.
* Not used by IOCP backend (which has its own mechanism).
*/
static void ioqueue_drain_pending_writes(pj_ioqueue_key_t *key)
{
#if PJ_IOQUEUE_CALLBACK_NO_LOCK
pj_ioqueue_lock_key(key);

if (key->write_callback_thread == pj_thread_this()) {
/* We are the callback thread (unregister called from within a
* write callback). Drain write_cb_list ourselves with success
* status, matching the pattern in
* ioqueue_dispatch_write_event_no_lock().
*/
while (!pj_list_empty(&key->write_cb_list)) {
struct write_operation *op = key->write_cb_list.next;
void (*on_wr_complete)(pj_ioqueue_key_t*,
pj_ioqueue_op_key_t*,
pj_ssize_t);

pj_list_erase(op);
on_wr_complete = key->cb.on_write_complete;
if (key->grp_lock)
pj_grp_lock_add_ref(key->grp_lock);
pj_ioqueue_unlock_key(key);

if (on_wr_complete) {
(*on_wr_complete)(key, (pj_ioqueue_op_key_t*)op,
op->written);
}
if (key->grp_lock)
pj_grp_lock_dec_ref(key->grp_lock);
pj_ioqueue_lock_key(key);
}
pj_ioqueue_unlock_key(key);
} else if (key->write_callback_thread) {
/* Another thread is the callback thread. Wait for it to finish
* draining write_cb_list. The IS_CLOSING check has been removed
* from ioqueue_dispatch_write_event_no_lock() so the callback
* thread will continue to drain even after closing is set.
*/
unsigned counter = 0;

while (key->write_callback_thread) {
pj_ioqueue_unlock_key(key);
pj_thread_sleep(10);
pj_ioqueue_lock_key(key);
if (++counter > 100) {
PJ_LOG(1, (THIS_FILE, "Timeout waiting for write "
"callback to drain on socket=%ld",
(long)key->fd));
break;
}
}
pj_ioqueue_unlock_key(key);
} else {
pj_ioqueue_unlock_key(key);
}
#endif

/* Walk write_list with -PJ_ECANCELLED (pending ops, never sent).
*
* Safe to walk without lock because:
* - closing=1 was set while key was locked, so any dispatch thread
* that subsequently acquires the lock sees IS_CLOSING and returns
* without touching write_list.
* - Any in-flight dispatch already dequeued its op from write_list
* while holding the lock, before unlocking for the callback.
* - pj_ioqueue_send() checks IS_CLOSING under lock, preventing
* new entries.
* - The socket fd was removed from the polling set before closing
* was set, so no new writable events will be dispatched.
*/
while (!pj_list_empty(&key->write_list)) {
struct write_operation *op = key->write_list.next;

pj_list_erase(op);
if (key->grp_lock)
pj_grp_lock_add_ref(key->grp_lock);

if (key->cb.on_write_complete) {
(*key->cb.on_write_complete)(key, (pj_ioqueue_op_key_t*)op,
-(pj_ssize_t)PJ_ECANCELLED);
}
if (key->grp_lock)
pj_grp_lock_dec_ref(key->grp_lock);
}
}

static pj_bool_t ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue,
pj_ioqueue_key_t *h )
{
Expand Down Expand Up @@ -1055,6 +1159,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
/* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write. */
flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);

#if PJ_IOQUEUE_FAST_TRACK
/* Fast track:
* Try to send data immediately, only if there's no pending write!
* Note:
Expand Down Expand Up @@ -1088,6 +1193,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
}
}
}
#endif

/*
* Schedule asynchronous send.
Expand Down Expand Up @@ -1186,6 +1292,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
/* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write */
flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);

#if PJ_IOQUEUE_FAST_TRACK
/* Fast track:
* Try to send data immediately, only if there's no pending write!
* Note:
Expand Down Expand Up @@ -1238,6 +1345,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
}
}
}
#endif

/*
* Check that address storage can hold the address parameter.
Expand Down Expand Up @@ -1331,6 +1439,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
accept_op = (struct accept_operation*)op_key;
PJ_ASSERT_RETURN(accept_op->op == PJ_IOQUEUE_OP_NONE, PJ_EPENDING);

#if PJ_IOQUEUE_FAST_TRACK
/* Fast track:
* See if there's new connection available immediately.
*/
Expand All @@ -1356,6 +1465,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
}
}
}
#endif

pj_ioqueue_lock_key(key);
/* Check again. Handle may have been closed after the previous check
Expand Down
3 changes: 3 additions & 0 deletions pjlib/src/pj/ioqueue_epoll.c
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
pj_ioqueue_unlock_key(key);

#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Drain pending write callbacks. See #4864, #4878. */
ioqueue_drain_pending_writes(key);

/* Decrement counter. */
decrement_counter(key);
#else
Expand Down
11 changes: 6 additions & 5 deletions pjlib/src/pj/ioqueue_kqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -452,22 +452,23 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister(pj_ioqueue_key_t *key)
/* Mark key is closing. */
key->closing = 1;

pj_ioqueue_unlock_key(key);

/* Drain pending write callbacks. See #4864, #4878. */
ioqueue_drain_pending_writes(key);

/* Decrement counter. */
decrement_counter(key);

/* Done. */
if (key->grp_lock) {
/* just dec_ref and unlock. we will set grp_lock to NULL
* elsewhere */
/* just dec_ref. we will set grp_lock to NULL elsewhere */
pj_grp_lock_t *grp_lock = key->grp_lock;
// Don't set grp_lock to NULL otherwise the other thread
// will crash. Just leave it as dangling pointer, but this
// should be safe
// key->grp_lock = NULL;
pj_grp_lock_dec_ref_dbg(grp_lock, "ioqueue", 0);
pj_grp_lock_release(grp_lock);
} else {
pj_ioqueue_unlock_key(key);
}
#else
if (key->grp_lock) {
Expand Down
19 changes: 13 additions & 6 deletions pjlib/src/pj/ioqueue_select.c
Original file line number Diff line number Diff line change
Expand Up @@ -543,12 +543,6 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
key->fd = PJ_INVALID_SOCKET;
}

/* Clear callback */
key->cb.on_accept_complete = NULL;
key->cb.on_connect_complete = NULL;
key->cb.on_read_complete = NULL;
key->cb.on_write_complete = NULL;

/* Must release ioqueue lock first before decrementing counter, to
* prevent deadlock.
*/
Expand All @@ -559,6 +553,19 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)

pj_ioqueue_unlock_key(key);

#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Drain pending write callbacks. See #4864, #4878.
* Must be done before clearing callbacks below.
*/
ioqueue_drain_pending_writes(key);
#endif

/* Clear callback */
key->cb.on_accept_complete = NULL;
key->cb.on_connect_complete = NULL;
key->cb.on_read_complete = NULL;
key->cb.on_write_complete = NULL;

#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Decrement counter. */
decrement_counter(key);
Expand Down
Loading
Loading