Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions sip/transaction_layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type TransactionLayer struct {
clientTransactions *transactionStore[*ClientTx]
serverTransactions *transactionStore[*ServerTx]

terminateOnConnClose bool

log *slog.Logger
}

Expand All @@ -46,6 +48,17 @@ func WithTransactionLayerUnhandledResponseHandler(f func(r *Response)) Transacti
}
}

// WithTransactionLayerTerminateOnConnClose enables termination of pending
// client and server transactions when the underlying connection closes,
// instead of waiting for timers to expire.
//
// Experimental
func WithTransactionLayerTerminateOnConnClose() TransactionLayerOption {
return func(txl *TransactionLayer) {
txl.terminateOnConnClose = true
}
}

func NewTransactionLayer(tpl *TransportLayer, options ...TransactionLayerOption) *TransactionLayer {
txl := &TransactionLayer{
tpl: tpl,
Expand All @@ -63,13 +76,59 @@ func NewTransactionLayer(tpl *TransportLayer, options ...TransactionLayerOption)

//Send all transport messages to our transaction layer
tpl.OnMessage(txl.handleMessage)

notify := func(conn Connection) { txl.OnConnectionClose(conn) }
if tpl.tcp != nil {
tpl.tcp.onConnClose = notify
}
if tpl.tls != nil {
tpl.tls.onConnClose = notify
}
if tpl.ws != nil {
tpl.ws.onConnClose = notify
}
if tpl.wss != nil {
tpl.wss.onConnClose = notify
}

return txl
}

func (txl *TransactionLayer) OnRequest(h TransactionRequestHandler) {
txl.reqHandler = h
}

// OnConnectionClose is called when a reliable transport connection (TCP, TLS,
// WS, WSS) is closed by the remote side or due to a read error.
func (txl *TransactionLayer) OnConnectionClose(conn Connection) {
if txl.terminateOnConnClose {
txl.terminateClientTransactions(conn)
txl.terminateServerTransactions(conn)
}
}

func (txl *TransactionLayer) terminateClientTransactions(conn Connection) {
txl.clientTransactions.mu.RLock()
for _, tx := range txl.clientTransactions.items {
if tx.conn == conn {
go tx.spinFsmWithError(client_input_transport_err,
fmt.Errorf("connection closed: %w", ErrTransactionTransport))
}
}
txl.clientTransactions.mu.RUnlock()
}

func (txl *TransactionLayer) terminateServerTransactions(conn Connection) {
txl.serverTransactions.mu.RLock()
for _, tx := range txl.serverTransactions.items {
if tx.conn == conn {
go tx.spinFsmWithError(server_input_transport_err,
fmt.Errorf("connection closed: %w", ErrTransactionTransport))
}
}
txl.serverTransactions.mu.RUnlock()
}

// handleMessage is entry for handling requests and responses from transport
func (txl *TransactionLayer) handleMessage(msg Message) {
// Having concurency here we increased throghput but also solving deadlock
Expand Down
7 changes: 7 additions & 0 deletions sip/transport_tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ type TransportTCP struct {
pool *connectionPool

DialerCreate func(laddr net.Addr) net.Dialer

onConnClose func(conn Connection)
}

func (t *TransportTCP) init(par *Parser) {
Expand Down Expand Up @@ -153,6 +155,11 @@ func (t *TransportTCP) readConnection(conn *TCPConnection, laddr string, raddr s
t.log.Warn("connection pool not clean cleanup", "error", err)
}
}()
defer func() {
if t.onConnClose != nil {
t.onConnClose(conn)
}
}()

// Create stream parser context
par := t.parser.NewSIPStream()
Expand Down
7 changes: 7 additions & 0 deletions sip/transport_ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type TransportWS struct {
dialer ws.Dialer

DialerCreate func(laddr net.Addr) ws.Dialer

onConnClose func(conn Connection)
}

func newWSTransport(par *Parser) *TransportWS {
Expand Down Expand Up @@ -169,6 +171,11 @@ func (t *TransportWS) readConnection(conn *WSConnection, laddr string, raddr str
t.log.Warn("connection pool not clean cleanup", "error", err)
}
}()
defer func() {
if t.onConnClose != nil {
t.onConnClose(conn)
}
}()
defer log.Debug("Websocket read connection stopped", "raddr", raddr)

// Create stream parser context
Expand Down
Loading