Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- SSH tunnels no longer pin a CPU core after the connection drops. A dropped tunnel is now detected and torn down instead of spinning in its relay loop. (#1769)

## [0.53.0] - 2026-06-25

### Added
Expand Down
41 changes: 41 additions & 0 deletions TablePro/Core/SSH/LibSSH2ChannelIO.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
//
// LibSSH2ChannelIO.swift
// TablePro
//

import Foundation

import CLibSSH2

/// Routes channel reads/writes through the serial `sessionQueue` because libssh2
/// is not thread-safe per session. Maps libssh2 return codes to the transport
/// agnostic results consumed by `SSHChannelRelay`.
internal struct LibSSH2ChannelIO: SSHChannelIO {
let channel: OpaquePointer
let session: OpaquePointer
let sessionQueue: DispatchQueue

func read(into buffer: UnsafeMutablePointer<CChar>, count: Int) -> ChannelReadResult {
let result = sessionQueue.sync { Int(tablepro_libssh2_channel_read(channel, buffer, count)) }
if result > 0 { return .bytes(result) }
if result == 0 { return .closed }
if sessionQueue.sync(execute: { libssh2_channel_eof(channel) }) != 0 { return .closed }
if result == Int(LIBSSH2_ERROR_EAGAIN) { return .wouldBlock }
return .closed
}

func write(_ buffer: UnsafePointer<CChar>, count: Int) -> ChannelWriteResult {
let result = sessionQueue.sync { Int(tablepro_libssh2_channel_write(channel, buffer, count)) }
if result > 0 { return .bytes(result) }
if result == Int(LIBSSH2_ERROR_EAGAIN) { return .wouldBlock }
return .closed
}

func blockDirections() -> RelayDirections {
let directions = sessionQueue.sync { libssh2_session_block_directions(session) }
var result: RelayDirections = []
if directions & LIBSSH2_SESSION_BLOCK_INBOUND != 0 { result.insert(.inbound) }
if directions & LIBSSH2_SESSION_BLOCK_OUTBOUND != 0 { result.insert(.outbound) }
return result
}
}
89 changes: 17 additions & 72 deletions TablePro/Core/SSH/LibSSH2Tunnel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -350,82 +350,27 @@ internal final class LibSSH2Tunnel: @unchecked Sendable {

/// Blocking relay loop. Runs on `relayQueue`; libssh2 calls go through `sessionQueue`.
private func runRelay(clientFD: Int32, channel: OpaquePointer) {
let buffer = UnsafeMutablePointer<CChar>.allocate(capacity: Self.relayBufferSize)
defer {
buffer.deallocate()
Darwin.close(clientFD)
if self.isRunning {
sessionQueue.sync {
libssh2_channel_close(channel)
libssh2_channel_free(channel)
}
}
}
let relay = SSHChannelRelay(
localFD: clientFD,
transportFD: socketFD,
channelIO: LibSSH2ChannelIO(channel: channel, session: session, sessionQueue: sessionQueue),
bufferSize: Self.relayBufferSize,
isActive: { [weak self] in self?.isRunning ?? false }
)

while self.isRunning {
var pollFDs = [
pollfd(fd: clientFD, events: Int16(POLLIN), revents: 0),
pollfd(fd: self.socketFD, events: Int16(POLLIN), revents: 0),
]
let termination = relay.run()

let pollResult = poll(&pollFDs, 2, 500) // 500ms timeout
if pollResult < 0 { break }
Darwin.close(clientFD)
guard self.isRunning else { return }

// Read from SSH channel when the SSH socket has data or on timeout
// (libssh2 may have internally buffered data)
if pollFDs[1].revents & Int16(POLLIN) != 0 || pollResult == 0 {
let readResult: Int = sessionQueue.sync {
Int(tablepro_libssh2_channel_read(channel, buffer, Self.relayBufferSize))
}
if readResult > 0 {
var totalSent = 0
while totalSent < readResult {
let sent = send(
clientFD,
buffer.advanced(by: totalSent),
readResult - totalSent,
0
)
if sent <= 0 { return }
totalSent += sent
}
} else if readResult == 0 || sessionQueue.sync(execute: { libssh2_channel_eof(channel) }) != 0 {
return
} else if readResult != Int(LIBSSH2_ERROR_EAGAIN) {
return
}
}
sessionQueue.sync {
libssh2_channel_close(channel)
libssh2_channel_free(channel)
}

// Read from client -> write to SSH channel
if pollFDs[0].revents & Int16(POLLIN) != 0 {
let clientRead = recv(clientFD, buffer, Self.relayBufferSize, 0)
if clientRead <= 0 { return }

var totalWritten = 0
while totalWritten < Int(clientRead) {
let written: Int = sessionQueue.sync {
Int(tablepro_libssh2_channel_write(
channel,
buffer.advanced(by: totalWritten),
Int(clientRead) - totalWritten
))
}
if written > 0 {
totalWritten += written
} else if written == Int(LIBSSH2_ERROR_EAGAIN) {
let directions = sessionQueue.sync {
libssh2_session_block_directions(self.session)
}
_ = self.waitForSocketDirections(
directions: directions,
socketFD: self.socketFD,
timeoutMs: 1_000
)
} else {
return
}
}
}
if termination == .transportHangup {
Self.logger.info("SSH transport hung up, marking tunnel dead for \(self.connectionId)")
markDead()
}
}

Expand Down
82 changes: 15 additions & 67 deletions TablePro/Core/SSH/LibSSH2TunnelFactory.swift
Original file line number Diff line number Diff line change
Expand Up @@ -735,75 +735,23 @@ internal enum LibSSH2TunnelFactory {
return Task.detached {
await withCheckedContinuation { (continuation: CheckedContinuation<Void, Never>) in
relayQueue.async {
let bufferSize = 32_768
let buffer = UnsafeMutablePointer<CChar>.allocate(capacity: bufferSize)
defer {
buffer.deallocate()
Darwin.close(socketFD)
continuation.resume()
}

while !Task.isCancelled {
var pollFDs = [
pollfd(fd: socketFD, events: Int16(POLLIN), revents: 0),
pollfd(fd: sshSocketFD, events: Int16(POLLIN), revents: 0),
]
let relay = SSHChannelRelay(
localFD: socketFD,
transportFD: sshSocketFD,
channelIO: LibSSH2ChannelIO(
channel: channel,
session: session,
sessionQueue: sessionQueue
),
bufferSize: 32_768,
isActive: { !Task.isCancelled }
)

let pollResult = poll(&pollFDs, 2, 500)
if pollResult < 0 { break }
_ = relay.run()

// Channel -> socketpair (serialized libssh2 call)
if pollFDs[1].revents & Int16(POLLIN) != 0 || pollResult == 0 {
let channelRead: Int = sessionQueue.sync {
Int(tablepro_libssh2_channel_read(channel, buffer, bufferSize))
}
if channelRead > 0 {
var totalSent = 0
while totalSent < channelRead {
let sent = send(
socketFD,
buffer.advanced(by: totalSent),
channelRead - totalSent,
0
)
if sent <= 0 { return }
totalSent += sent
}
} else if channelRead == 0
|| sessionQueue.sync(execute: { libssh2_channel_eof(channel) }) != 0 {
return
} else if channelRead != Int(LIBSSH2_ERROR_EAGAIN) {
return
}
}

// Socketpair -> channel
if pollFDs[0].revents & Int16(POLLIN) != 0 {
let socketRead = recv(socketFD, buffer, bufferSize, 0)
if socketRead <= 0 { return }

var totalWritten = 0
while totalWritten < Int(socketRead) {
let written: Int = sessionQueue.sync {
Int(tablepro_libssh2_channel_write(
channel,
buffer.advanced(by: totalWritten),
Int(socketRead) - totalWritten
))
}
if written > 0 {
totalWritten += written
} else if written == Int(LIBSSH2_ERROR_EAGAIN) {
var writePollFD = pollfd(
fd: sshSocketFD, events: Int16(POLLOUT), revents: 0
)
_ = poll(&writePollFD, 1, 1_000)
} else {
return
}
}
}
}
shutdown(socketFD, SHUT_RDWR)
Darwin.close(socketFD)
continuation.resume()
}
}
}
Expand Down
20 changes: 20 additions & 0 deletions TablePro/Core/SSH/RelayPollState.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
//
// RelayPollState.swift
// TablePro
//

import Foundation

internal enum RelayFDState: Equatable {
case idle
case readable
case drainThenStop
case stop
}

internal func relayFDState(_ revents: Int16) -> RelayFDState {
if revents & Int16(POLLERR | POLLNVAL) != 0 { return .stop }
if revents & Int16(POLLHUP) != 0 { return .drainThenStop }
if revents & Int16(POLLIN) != 0 { return .readable }
return .idle
}
Loading
Loading