Skip to content

Commit a40e800

Browse files
Copilotchrjohn
andcommitted
Add threading model documentation (HTML + Markdown)
Co-authored-by: chrjohn <6644028+chrjohn@users.noreply.github.com> Agent-Logs-Url: https://github.com/quickfix-j/quickfixj/sessions/e6f7a3f4-396a-403d-b91a-6501d8d500db
1 parent 293b2a1 commit a40e800

File tree

3 files changed

+484
-0
lines changed

3 files changed

+484
-0
lines changed

docs/threading-model.md

Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
# QuickFIX/J Threading Model
2+
3+
## 1. Overview
4+
5+
QuickFIX/J uses [Apache MINA](http://mina.apache.org/) for non-blocking I/O. The threading model for message processing is controlled by the `EventHandlingStrategy` interface (`quickfix.mina.EventHandlingStrategy`), with two concrete implementations:
6+
7+
- **`SingleThreadedEventHandlingStrategy`** — one thread processes messages for all sessions (`SocketAcceptor`, `SocketInitiator`)
8+
- **`ThreadPerSessionEventHandlingStrategy`** — one thread per session processes messages (`ThreadedSocketAcceptor`, `ThreadedSocketInitiator`)
9+
10+
Both strategies co-exist with the **timer thread**, which is always present and always calls `Session.next()` (no-arg) on a 1-second schedule, regardless of which event-handling strategy is in use.
11+
12+
---
13+
14+
## 2. Connector Classes and Their Strategy
15+
16+
| Connector class | Event handling strategy | Thread name(s) |
17+
|---|---|---|
18+
| `SocketAcceptor` | `SingleThreadedEventHandlingStrategy` | `QFJ Message Processor` |
19+
| `SocketInitiator` | `SingleThreadedEventHandlingStrategy` | `QFJ Message Processor` |
20+
| `ThreadedSocketAcceptor` | `ThreadPerSessionEventHandlingStrategy` | `QF/J Session dispatcher: <sessionID>` |
21+
| `ThreadedSocketInitiator` | `ThreadPerSessionEventHandlingStrategy` | `QF/J Session dispatcher: <sessionID>` |
22+
23+
---
24+
25+
## 3. Single-Threaded Model (`SingleThreadedEventHandlingStrategy`)
26+
27+
**Class:** `quickfix.mina.SingleThreadedEventHandlingStrategy`
28+
**Source:** `quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java`
29+
30+
- A single `BlockingQueue<SessionMessageEvent>` holds events from **all** sessions.
31+
- One background thread named **`QFJ Message Processor`** (a daemon thread) drains the queue and calls `session.next(message)` for each event via `SessionMessageEvent.processMessage()`.
32+
- The thread is started via `blockInThread()`, which creates a `ThreadAdapter` wrapping the `block()` loop.
33+
- `onMessage()` wraps incoming messages into a `SessionMessageEvent` and puts them on the shared queue.
34+
- The `block()` loop polls the queue with a timeout (`THREAD_WAIT_FOR_MESSAGE_MS`) so it can periodically check the `isStopped` flag.
35+
- On stop, remaining queued messages are drained and processed before the thread exits.
36+
- The `getQueueSize(SessionID)` method returns the total queue size (single queue for all sessions — there is no per-session view).
37+
38+
**Key point for application developers:** Because all sessions share a single processing thread, a slow `fromApp()` callback will delay processing for **all** other sessions.
39+
40+
---
41+
42+
## 4. Thread-per-Session Model (`ThreadPerSessionEventHandlingStrategy`)
43+
44+
**Class:** `quickfix.mina.ThreadPerSessionEventHandlingStrategy`
45+
**Source:** `quickfixj-core/src/main/java/quickfix/mina/ThreadPerSessionEventHandlingStrategy.java`
46+
47+
- A `ConcurrentHashMap<SessionID, MessageDispatchingThread>` maps each session to its own dispatcher thread.
48+
- On the first `onMessage()` call for a given session, a new `MessageDispatchingThread` is created and started via `startDispatcherThread()`.
49+
- Each `MessageDispatchingThread` has its own `BlockingQueue<Message>` (or watermark-tracked queue) and loops calling `session.next(message)`.
50+
- Thread name: **`QF/J Session dispatcher: <BeginString>:<SenderCompID>/<TargetCompID>`**
51+
- The `Executor` can be customised via `setExecutor()`. The default is `DedicatedThreadExecutor`, which creates a plain `new Thread(command, name).start()`.
52+
- On stop, `stopDispatcherThreads()` enqueues `END_OF_STREAM` to every dispatcher, sets `stopping=true`, and waits (polling every 100 ms) until all dispatchers report `isStopped`.
53+
- After a dispatcher drains its remaining queue on shutdown, it removes itself from the `dispatchers` map.
54+
55+
**Key point for application developers:** Since each session has its own thread, a slow `fromApp()` for one session does **not** block others. However, your `Application` implementation **must be thread-safe** if it shares state across sessions.
56+
57+
---
58+
59+
## 5. The Timer Thread and `Session.next()`
60+
61+
This is a critical part of the threading model that is **orthogonal** to the message-processing strategies above.
62+
63+
**Class:** `quickfix.mina.SessionConnector`
64+
**Source:** `quickfixj-core/src/main/java/quickfix/mina/SessionConnector.java`
65+
66+
### 5.1 The `QFJ Timer` Thread
67+
68+
A single `ScheduledExecutorService` (a shared static instance using a `QFTimerThreadFactory`) runs a `SessionTimerTask` at a fixed rate of **every 1000 ms**.
69+
70+
```java
71+
// SessionConnector.java
72+
private static class QFTimerThreadFactory implements ThreadFactory {
73+
@Override
74+
public Thread newThread(Runnable runnable) {
75+
Thread thread = new Thread(runnable, "QFJ Timer");
76+
thread.setDaemon(true);
77+
return thread;
78+
}
79+
}
80+
```
81+
82+
The timer is started by `startSessionTimer()`:
83+
84+
```java
85+
protected void startSessionTimer() {
86+
if (checkSessionTimerRunning()) {
87+
return;
88+
}
89+
Runnable timerTask = new SessionTimerTask();
90+
if (shortLivedExecutor != null) {
91+
timerTask = new DelegatingTask(timerTask, shortLivedExecutor);
92+
}
93+
sessionTimerFuture = SCHEDULED_EXECUTOR.scheduleAtFixedRate(timerTask, 0, 1000L,
94+
TimeUnit.MILLISECONDS);
95+
}
96+
```
97+
98+
Only one timer is ever started per connector. If `startSessionTimer()` is called again while the timer is still running (e.g. during `createDynamicSession()`), the existing timer is reused.
99+
100+
### 5.2 `SessionTimerTask` Iterates All Sessions and Calls `Session.next()`
101+
102+
```java
103+
private class SessionTimerTask implements Runnable {
104+
@Override
105+
public void run() {
106+
try {
107+
for (Session session : sessions.values()) {
108+
try {
109+
session.next();
110+
} catch (IOException e) {
111+
LogUtil.logThrowable(session.getLog(), "Error in session timer processing", e);
112+
}
113+
}
114+
} catch (Throwable e) {
115+
log.error("Error during timer processing", e);
116+
}
117+
}
118+
}
119+
```
120+
121+
Even though each session may have its own dispatcher thread (in the thread-per-session model), the timer thread also calls `session.next()` directly on every session. This is independent of which `EventHandlingStrategy` is in use.
122+
123+
### 5.3 What Does `Session.next()` (No-arg) Do?
124+
125+
`Session.next()` is called from the timer, **not** from user code. Its Javadoc states:
126+
127+
> Called from the timer-related code in the acceptor/initiator implementations. This is not typically called from application code.
128+
129+
Its responsibilities (from `Session.java`):
130+
131+
1. **Checks if the session is enabled.** If disabled and still logged on, it initiates a Logout.
132+
2. **Checks session schedule.** If outside the configured session time window, it may reset sequence numbers or disconnect. This check is throttled to once per second.
133+
3. **Returns early if not connected** (`hasResponder()` is false).
134+
4. **Handles logon state:** If logon has not been received, it may send a Logon (for initiators) or detect a logon timeout.
135+
5. **Checks logout timeout** if a logout has been sent.
136+
6. **Heartbeat management:**
137+
- If `HeartBtInt == 0`: returns (no heartbeat management).
138+
- If timed out waiting for a heartbeat: disconnects (unless `DisableHeartBeatCheck=Y`).
139+
- If a TestRequest is needed: sends a TestRequest (`generateTestRequest("TEST")`).
140+
- If a Heartbeat is needed: sends a Heartbeat (`generateHeartbeat()`).
141+
142+
The full flow:
143+
144+
```
145+
QFJ Timer thread (every 1 second)
146+
└─► SessionTimerTask.run()
147+
└─► for each Session in sessions.values():
148+
└─► Session.next()
149+
├─ check enabled
150+
├─ check session schedule / reset
151+
├─ check hasResponder()
152+
├─ check logon state (send Logon if initiator)
153+
├─ check logout timeout
154+
└─ heartbeat management
155+
├─ isTimedOut() → disconnect
156+
├─ isTestRequestNeeded() → send TestRequest
157+
└─ isHeartBeatNeeded() → send Heartbeat
158+
```
159+
160+
### 5.4 The Overloaded `Session.next(Message)` — Called by Dispatchers
161+
162+
The `Session.next(Message message)` overload is what `MessageDispatchingThread` and `SessionMessageEvent` call with an actual FIX message. This processes the received message (validates, dispatches to `fromAdmin` / `fromApp`, handles sequence numbers, etc.). This is **distinct** from the no-arg `Session.next()` used by the timer.
163+
164+
---
165+
166+
## 6. Thread Interaction Summary
167+
168+
```
169+
┌─────────────────────────────────────────────────────────────────────────┐
170+
│ MINA I/O Threads │
171+
│ (NIO selector threads, named "NioProcessor-N") │
172+
│ Receive raw bytes → decode FIX message → call EventHandlingStrategy │
173+
└──────────────────────────────┬──────────────────────────────────────────┘
174+
│ onMessage(session, message)
175+
┌───────────────────┴────────────────────┐
176+
│ │
177+
SingleThreaded ThreadPerSession
178+
────────────── ────────────────
179+
One shared queue Per-session queue
180+
One "QFJ Message Processor" One "QF/J Session dispatcher:
181+
thread calls <sessionID>" thread per session
182+
session.next(msg) calls session.next(msg)
183+
184+
Both strategies co-exist with the Timer Thread:
185+
186+
┌─────────────────────────────────────────────────────────────────────────┐
187+
│ QFJ Timer Thread (daemon) │
188+
│ ScheduledExecutorService fires every 1000ms │
189+
│ SessionTimerTask iterates ALL sessions → calls Session.next() │
190+
│ (handles heartbeats, logon, session schedule, timeouts) │
191+
└─────────────────────────────────────────────────────────────────────────┘
192+
```
193+
194+
---
195+
196+
## 7. Queue Capacity and Back-Pressure
197+
198+
Both strategies support configurable queue capacity:
199+
200+
- **Fixed capacity:** `new SingleThreadedEventHandlingStrategy(connector, queueCapacity)` — bounded `LinkedBlockingQueue`. Producers block when full (back-pressure).
201+
- **Watermark-based:** `new SingleThreadedEventHandlingStrategy(connector, lowerWatermark, upperWatermark)` — uses `QueueTrackers.newMultiSessionWatermarkTracker(...)`. Flow control is applied per-session within the shared queue.
202+
- Same two options exist for `ThreadPerSessionEventHandlingStrategy`, with `newSingleSessionWatermarkTracker` per session.
203+
204+
---
205+
206+
## 8. Custom `Executor` Injection
207+
208+
Both strategies accept a custom `java.util.concurrent.Executor` via `setExecutor(executor)`, called during `start()` from the connector. This allows integration with application-managed thread pools (e.g. virtual threads in Java 21+):
209+
210+
```java
211+
// Example: use virtual threads for session dispatchers (Java 21+)
212+
ThreadedSocketAcceptor acceptor = new ThreadedSocketAcceptor(...);
213+
acceptor.start(); // internally calls eventHandlingStrategy.setExecutor(longLivedExecutor)
214+
```
215+
216+
The `longLivedExecutor` is provided by `SessionConnector` and can be customised. If no executor is set, `DedicatedThreadExecutor` creates a plain `new Thread(...)` per session/strategy.
217+
218+
---
219+
220+
## 9. Thread Safety Implications for Application Developers
221+
222+
- **`SocketAcceptor` / `SocketInitiator` (single-threaded):** The `Application` callbacks (`fromApp`, `fromAdmin`, etc.) are called from the single `QFJ Message Processor` thread. No concurrent calls to the same session. However, `Session.next()` (timer) runs concurrently from the `QFJ Timer` thread — it does not call application callbacks but it does send messages on the wire.
223+
- **`ThreadedSocketAcceptor` / `ThreadedSocketInitiator` (thread-per-session):** Each session has its own dispatcher thread. Callbacks for **different sessions** may execute concurrently. Your `Application` implementation **must be thread-safe** if it shares state across sessions.
224+
- In both models, the `QFJ Timer` thread runs concurrently with message-processing threads and calls `Session.next()` (no-arg), which may send heartbeats or disconnect. `Session` internally synchronizes on `this` to protect shared state.
225+
226+
---
227+
228+
*See also: [`quickfixj-core/src/main/doc/usermanual/usage/threading.html`](../quickfixj-core/src/main/doc/usermanual/usage/threading.html) for the HTML version of this document.*

quickfixj-core/src/main/doc/usermanual/index.html

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ <h2>Building and Testing QuickFIX/J</h2>
6161
<h2>Using QuickFIX/J</h2>
6262
<ul>
6363
<li><a href="usage/application.html">Creating Your Application</a></li>
64+
<li><a href="usage/threading.html">Threading Model</a></li>
6465
<li><a href="usage/configuration.html">Configuration</a></li>
6566
<li><a href="usage/acceptor_failover.html">Acceptor Failover Support</a></li>
6667
<li><a href="usage/acceptor_dynamic.html">Dynamic Acceptor Session Definition</a></li>

0 commit comments

Comments
 (0)