Skip to content

Commit 417251e

Browse files
authored
feat: add MPSC (Multi-Producer Single-Consumer) channel support (#1)
feat: add MPSC channel support Multi-Producer Single-Consumer lock-free channels for concurrent access. Key features: - Wait-free algorithm: 10-16M ops/sec (1-2 producers) - SPSC performance unchanged (438M ops/sec) - Cache-line padding prevents false sharing - Comprehensive tests and benchmarks Backward compatible: opt-in via ChannelMode parameter
1 parent cd7ec8a commit 417251e

File tree

3 files changed

+675
-8
lines changed

3 files changed

+675
-8
lines changed

src/private/channel_spsc.nim

Lines changed: 100 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,32 @@ import std/[atomics, times]
88
type
99
ChannelMode* = enum
1010
SPSC
11+
MPSC ## Multi-Producer Single-Consumer
1112

1213
SPSCSlot[T] = object
1314
value: T
1415
sequence: Atomic[int]
1516

17+
# Padding to prevent false sharing between producer and consumer cache lines
18+
CacheLinePad = object
19+
pad: array[64, byte]
20+
1621
Channel*[T] = ref object
1722
mode: ChannelMode
1823
# Lock-free single producer single consumer
1924
buffer: seq[SPSCSlot[T]]
2025
mask: int
26+
27+
# SPSC fields
2128
head: Atomic[int] # Producer writes here
29+
pad1: CacheLinePad # Prevent false sharing
2230
tail: Atomic[int] # Consumer reads here
31+
pad2: CacheLinePad
32+
33+
# MPSC-specific fields
34+
mpscHead: Atomic[int] # Atomic head for multi-producer CAS
35+
mpscCount: Atomic[int] # Track count for wait-free full check
36+
2337
capacity: int
2438

2539
proc newChannel*[T](size: int, mode: ChannelMode): Channel[T] =
@@ -35,9 +49,13 @@ proc newChannel*[T](size: int, mode: ChannelMode): Channel[T] =
3549
result.head.store(0, moRelaxed)
3650
result.tail.store(0, moRelaxed)
3751

38-
proc trySend*[T](c: Channel[T], value: T): bool =
39-
## Try to send a value to the channel. Returns true if successful.
40-
## Non-blocking operation.
52+
# Initialize MPSC-specific fields
53+
if mode == MPSC:
54+
result.mpscHead.store(0, moRelaxed)
55+
result.mpscCount.store(0, moRelaxed)
56+
57+
proc trySendSPSC[T](c: Channel[T], value: T): bool =
58+
## SPSC-optimized send (single producer, relaxed ordering).
4159
let currentHead = c.head.load(moRelaxed)
4260
let currentTail = c.tail.load(moAcquire)
4361

@@ -54,9 +72,40 @@ proc trySend*[T](c: Channel[T], value: T): bool =
5472
c.head.store(currentHead + 1, moRelease)
5573
return true
5674

57-
proc tryReceive*[T](c: Channel[T], value: var T): bool =
58-
## Try to receive a value from the channel. Returns true if successful.
59-
## Non-blocking operation.
75+
proc trySendMPSC[T](c: Channel[T], value: T): bool =
76+
## MPSC wait-free send (multiple producers, CAS on head).
77+
## Based on dbittman's wait-free MPSC algorithm + JCTools patterns.
78+
79+
# Step 1: Atomically increment count to reserve a slot (wait-free)
80+
let count = c.mpscCount.fetchAdd(1, moAcquire)
81+
if count >= c.capacity:
82+
# Queue full - backoff
83+
discard c.mpscCount.fetchSub(1, moRelease)
84+
return false
85+
86+
# Step 2: Atomically claim a slot by incrementing head (wait-free)
87+
let myHead = c.mpscHead.fetchAdd(1, moAcquire)
88+
89+
# Step 3: Write to the slot (no contention, we own it)
90+
let slot = myHead and c.mask
91+
c.buffer[slot].value = value
92+
93+
# Step 4: Publish the write with release semantics for consumer visibility
94+
c.buffer[slot].sequence.store(myHead + 1, moRelease)
95+
96+
return true
97+
98+
proc trySend*[T](c: Channel[T], value: T): bool =
99+
## Try to send a value to the channel. Returns true if successful.
100+
## Non-blocking operation. Dispatches to SPSC or MPSC implementation.
101+
case c.mode
102+
of SPSC:
103+
return trySendSPSC(c, value)
104+
of MPSC:
105+
return trySendMPSC(c, value)
106+
107+
proc tryReceiveSPSC[T](c: Channel[T], value: var T): bool =
108+
## SPSC-optimized receive (single consumer, relaxed ordering).
60109
let currentTail = c.tail.load(moRelaxed)
61110
let currentHead = c.head.load(moAcquire)
62111

@@ -77,14 +126,57 @@ proc tryReceive*[T](c: Channel[T], value: var T): bool =
77126
c.tail.store(currentTail + 1, moRelease)
78127
return true
79128

129+
proc tryReceiveMPSC[T](c: Channel[T], value: var T): bool =
130+
## MPSC receive (single consumer, must handle concurrent producers).
131+
## Uses mpscHead instead of head for accurate empty check.
132+
let currentTail = c.tail.load(moRelaxed)
133+
let currentHead = c.mpscHead.load(moAcquire)
134+
135+
# Check if empty
136+
if currentTail >= currentHead:
137+
return false
138+
139+
# Read from slot
140+
let slot = currentTail and c.mask
141+
let seq = c.buffer[slot].sequence.load(moAcquire)
142+
143+
# Wait-free: if producer hasn't published yet, return false (not an error)
144+
if seq != currentTail + 1:
145+
return false # Producer in-flight, try again later
146+
147+
value = c.buffer[slot].value
148+
149+
# Update tail and decrement count
150+
c.tail.store(currentTail + 1, moRelease)
151+
discard c.mpscCount.fetchSub(1, moRelease)
152+
153+
return true
154+
155+
proc tryReceive*[T](c: Channel[T], value: var T): bool =
156+
## Try to receive a value from the channel. Returns true if successful.
157+
## Non-blocking operation. Dispatches to SPSC or MPSC implementation.
158+
case c.mode
159+
of SPSC:
160+
return tryReceiveSPSC(c, value)
161+
of MPSC:
162+
return tryReceiveMPSC(c, value)
163+
80164
proc capacity*[T](c: Channel[T]): int =
81165
## Get the capacity of the channel.
82166
c.capacity
83167

84168
proc isEmpty*[T](c: Channel[T]): bool =
85169
## Check if the channel is empty.
86-
c.tail.load(moRelaxed) >= c.head.load(moRelaxed)
170+
case c.mode
171+
of SPSC:
172+
c.tail.load(moRelaxed) >= c.head.load(moRelaxed)
173+
of MPSC:
174+
c.tail.load(moRelaxed) >= c.mpscHead.load(moRelaxed)
87175

88176
proc isFull*[T](c: Channel[T]): bool =
89177
## Check if the channel is full.
90-
c.head.load(moRelaxed) - c.tail.load(moRelaxed) >= c.capacity
178+
case c.mode
179+
of SPSC:
180+
c.head.load(moRelaxed) - c.tail.load(moRelaxed) >= c.capacity
181+
of MPSC:
182+
c.mpscCount.load(moRelaxed) >= c.capacity

0 commit comments

Comments
 (0)