Skip to content

Acceptor drops pipelined frames, killing connections and losing link credit#330

Open
Indomitable wants to merge 2 commits intominghuaw:mainfrom
Indomitable:patch/pipelined-flow-credit
Open

Acceptor drops pipelined frames, killing connections and losing link credit#330
Indomitable wants to merge 2 commits intominghuaw:mainfrom
Indomitable:patch/pipelined-flow-credit

Conversation

@Indomitable
Copy link
Copy Markdown

I'm building an Azure Service Bus emulator using fe2o3-amqp v0.14.0 as the AMQP server. The Azure .NET SDK (Azure.Messaging.ServiceBus 7.20.1) pipelines its AMQP frames pretty aggressively — it sends Begin + Attach + Flow back-to-back without waiting for server responses between them. This is allowed by the AMQP 1.0 spec (Section 2.5.1), but it triggers three related problems in the acceptor code.

First connections usually work fine. Second and subsequent connections fail nondeterministically — the server closes them with amqp:not-found, or the CBS authentication hangs forever. On the .NET side this shows up as TaskCanceledException or connection timeouts.

Problem 1: forward_to_session can't find the session relay

When the client opens a remotely-initiated session, ListenerConnection::on_incoming_begin sends an IncomingSession to the session_listener channel for user code to accept. The relay only gets registered in session_by_incoming_channel later, when on_outgoing_begin processes the server's response Begin.

But if the client pipelines an Attach right after its Begin, the connection engine tries to route it via forward_to_session before the relay exists:

// connection/engine.rs, forward_to_session
match self.connection.session_tx_by_incoming_channel(channel) {
    Some(tx) => tx.send(frame).await?,
    None => return Err(ConnectionInnerError::NotFound(None)),  // boom
};

This kills the entire connection.

Problem 2: Pipelined Flow crashes the session engine

After fixing problem 1 (so pipelined frames actually reach the session), the next thing that breaks is Flow handling. The Attach gets correctly buffered in the link_listener channel, but the Flow that follows it references a link handle that hasn't been registered yet — accept_incoming_attach hasn't been called. on_incoming_flow_inner looks up link_by_input_handle, doesn't find it, and returns UnattachedHandle. This escalates through on_errorend_session, killing the session engine. The connection engine then gets a SendError because the session's channel dropped.

Same thing happens with pipelined Transfer frames.

Problem 3: Dropping the Flow loses the initial credit grant

Even after catching UnattachedHandle gracefully (instead of crashing), just dropping the Flow causes a subtler problem. That pipelined Flow typically carries the initial credit grant from the remote receiver (link_credit=50). The sender link on the server side starts with zero credit and waits for a Flow to grant some. If the only credit-granting Flow was dropped, the sender blocks forever on flow_state.consume(1).

In my case this manifested as the CBS response path deadlocking — the CBS receiver task would get the put-token request fine, but send_response() would hang because the CBS sender link had no credit.

  1. Eagerly register the session relay

In on_incoming_begin, allocate the session relay and register it in session_by_incoming_channel immediately, before handing off to user code. Pass the pre-allocated incoming_rx and outgoing_channel through IncomingSession so accept_incoming_session reuses them.

  1. Buffer and replay pipelined link-level Flows

Override on_incoming_flow in ListenerSession to catch UnattachedHandle — instead of propagating the error, buffer the LinkFlow in a HashMap<InputHandle, Vec<LinkFlow>>. Do the same for on_incoming_transfer (drop rather than crash). Then override allocate_incoming_link to replay buffered flows after the relay is inserted into link_by_input_handle, so credit is available immediately. This requires making Producer.state pub(crate) for synchronous access to the flow state during replay.

When a remote peer (e.g. the Azure Service Bus .NET SDK) pipelines
Begin+Attach frames without waiting for the server's Begin response,
the Attach frames arrive before the session is accepted and its relay
is registered in session_by_incoming_channel. This causes
forward_to_session to return NotFound, killing the connection.

Fix: in the listener connection's on_incoming_begin handler, eagerly
allocate the session relay and register it for the incoming channel.
The pre-allocated incoming_rx and outgoing_channel are passed through
IncomingSession so that accept_incoming_session reuses them instead
of allocating a second relay.
When a remote peer pipelines Flow frames immediately after Attach
(before the listener application has called accept_incoming_attach),
the link handle is not yet registered in link_by_input_handle.
Session::on_incoming_flow_inner returns UnattachedHandle, which
escalates to an error that kills the session engine.

This is particularly problematic for the initial credit grant (e.g.
link_credit=50) that the remote receiver sends in its first Flow.
If this Flow is dropped, the corresponding sender link starts with
zero credit and blocks forever.

Fix:
1. Override on_incoming_flow in ListenerSession to catch
   UnattachedHandle and buffer the LinkFlow in pending_link_flows
   (keyed by InputHandle) instead of propagating the error.

2. Override on_incoming_transfer similarly — drop pipelined transfers
   for unregistered handles rather than killing the session.

3. Override allocate_incoming_link to replay any buffered LinkFlow
   frames synchronously after the relay is inserted. For Sender
   relays, update flow_state and notify waiters so credit is
   available immediately. For Receiver relays, call on_incoming_flow.

4. Make Producer.state pub(crate) so allocate_incoming_link (which
   is sync) can access the flow state directly for replay.
@minghuaw
Copy link
Copy Markdown
Owner

Thanks for the PR. Sorry I've been busy for the past months. I'll try to take a look this week.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants