Add InputStream-based message support for large request bodies#4171
Add InputStream-based message support for large request bodies#4171
Conversation
Replace Netty's HttpObjectAggregator with a custom NettyBodyHandler that streams large request bodies to temp files instead of buffering in memory. Small bodies (below configurable threshold, default 8MB) remain in-memory for performance. - Add InputStreamMessage: a JVM-only Message implementation backed by InputStream with lazy caching via TeeInputStream - Add NettyBodyHandler: processes HTTP chunks individually, spills to disk when body exceeds threshold - Simplify NettyRequestHandler to receive pre-assembled HttpMessage.Request - Support InputStream as a handler parameter type in HttpRequestMapper - Add bodyBufferThresholdBytes config to NettyServerConfig Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Code Review
This pull request introduces a NettyBodyHandler to manage HTTP request bodies by buffering small payloads in memory and spilling larger ones to temporary files, which helps reduce heap usage. It also adds support for binding java.io.InputStream to endpoint parameters. The review feedback highlights a critical bug where temporary files are deleted prematurely due to the asynchronous nature of the request processing. Furthermore, improvements are suggested to prevent resource leaks during file creation and to optimize memory usage when writing request chunks to disk.
| } finally { | ||
| // Clean up temp file if body was file-backed | ||
| NettyBodyHandler.cleanupTempFile(ctx) | ||
| // Need to clean up the TLS in case the same thread is reused for the next request | ||
| NettyBackend.clearThreadLocal() | ||
| } |
There was a problem hiding this comment.
The finally block cleans up the temporary file used for large request bodies. However, RxRunner.run is non-blocking and dispatches the request for asynchronous processing. This means the finally block will execute immediately, deleting the temporary file before the request handler has a chance to read it. This is a critical bug that will cause I/O exceptions for any request with a body large enough to be spooled to disk.
The cleanup logic must be deferred until after the response has been fully sent to the client. A robust way to achieve this is by attaching a listener to the ChannelFuture of the final write operation for the response.
| val state = if (useFile) { | ||
| val tmpFile = Files.createTempFile("airframe-body-", ".tmp").toFile | ||
| RequestState(req, bodyBuf = null, fileBuf = new FileOutputStream(tmpFile), tmpFile = Some(tmpFile)) | ||
| } else { | ||
| RequestState(req, bodyBuf = null, fileBuf = null, tmpFile = None) | ||
| } |
There was a problem hiding this comment.
There is a potential resource leak here. Files.createTempFile creates a file on disk. If the subsequent new FileOutputStream(tmpFile) constructor throws an exception (e.g., due to permissions or other I/O errors), the created tmpFile will not be deleted. The cleanup logic in exceptionCaught will not handle this case because the RequestState has not yet been associated with the channel. This can lead to an accumulation of temporary files on the server. To prevent this, you should wrap the resource allocation in a try-catch block and ensure the temporary file is deleted on failure.
val state = if (useFile) {
val tmpFile = Files.createTempFile("airframe-body-", ".tmp").toFile
try {
RequestState(req, bodyBuf = null, fileBuf = new FileOutputStream(tmpFile), tmpFile = Some(tmpFile))
} catch {
case e: Throwable =>
tmpFile.delete()
throw e
}
} else {
RequestState(req, bodyBuf = null, fileBuf = null, tmpFile = None)
}| if (state.bodyBuf.size() > bodyBufferThresholdBytes) { | ||
| val tmpFile = Files.createTempFile("airframe-body-", ".tmp").toFile | ||
| val fos = new FileOutputStream(tmpFile) | ||
| state.bodyBuf.writeTo(fos) | ||
| state.fileBuf = fos | ||
| state.tmpFile = Some(tmpFile) | ||
| state.bodyBuf = null // Release in-memory buffer | ||
| } |
There was a problem hiding this comment.
There is a potential resource leak when spilling the in-memory buffer to a file. If new FileOutputStream(tmpFile) throws an exception, the tmpFile created by Files.createTempFile will be leaked because the exception is not caught here and the cleanup logic will not be aware of this partially-initialized state. You should use a try-catch block to ensure the file is deleted if an error occurs during the spill operation.
if (state.bodyBuf.size() > bodyBufferThresholdBytes) {
val tmpFile = Files.createTempFile("airframe-body-", ".tmp").toFile
try {
val fos = new FileOutputStream(tmpFile)
state.bodyBuf.writeTo(fos)
state.fileBuf = fos
state.tmpFile = Some(tmpFile)
state.bodyBuf = null // Release in-memory buffer
} catch {
case e: Throwable =>
tmpFile.delete()
throw e
}
}| val bytes = new Array[Byte](size) | ||
| buf.readBytes(bytes) | ||
| state.fileBuf.write(bytes) |
There was a problem hiding this comment.
For efficiency, you can avoid allocating an intermediate Array[Byte] for each chunk of data. Netty's ByteBuf can write its content directly to an OutputStream. Using buf.readBytes(state.fileBuf, size) will reduce memory allocations and garbage collection pressure, which is particularly beneficial when handling large file uploads.
| val bytes = new Array[Byte](size) | |
| buf.readBytes(bytes) | |
| state.fileBuf.write(bytes) | |
| buf.readBytes(state.fileBuf, size) |
- Fix keep-alive regression: respect client's Connection header (HTTP/1.1 defaults to keep-alive, close only on explicit "Connection: close") - Remove redundant @volatile on synchronized field in InputStreamMessage - Use buf.readBytes(outputStream, size) to avoid per-chunk byte array allocation - Use HttpUtil.getContentLength for Content-Length header parsing - Simplify TeeInputStream.close to not drain remaining bytes (avoids memory issues) - Change RequestState from case class to plain class (mutable state semantics) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces NettyBodyHandler to replace HttpObjectAggregator, enabling the buffering of large HTTP request bodies to temporary files instead of memory. It also adds support for java.io.InputStream as an endpoint parameter, allowing for streaming of large request bodies. Feedback focuses on improving resource management by ensuring temporary files and streams are properly closed or deleted in the event of exceptions during request processing and cleanup.
| val state = if (useFile) { | ||
| val tmpFile = Files.createTempFile("airframe-body-", ".tmp").toFile | ||
| new RequestState(req, bodyBuf = null, fileBuf = new FileOutputStream(tmpFile), tmpFile = Some(tmpFile)) | ||
| } else { | ||
| new RequestState(req, bodyBuf = null, fileBuf = null, tmpFile = None) | ||
| } |
There was a problem hiding this comment.
There is a potential resource leak here. If new FileOutputStream(tmpFile) throws an exception, the tmpFile that was just created will not be deleted. To prevent this file leak, you should wrap the resource allocation in a try-catch block to ensure the temporary file is deleted on failure.
| val state = if (useFile) { | |
| val tmpFile = Files.createTempFile("airframe-body-", ".tmp").toFile | |
| new RequestState(req, bodyBuf = null, fileBuf = new FileOutputStream(tmpFile), tmpFile = Some(tmpFile)) | |
| } else { | |
| new RequestState(req, bodyBuf = null, fileBuf = null, tmpFile = None) | |
| } | |
| val state = if (useFile) { | |
| val tmpFile = Files.createTempFile("airframe-body-", ".tmp").toFile | |
| try { | |
| new RequestState(req, bodyBuf = null, fileBuf = new FileOutputStream(tmpFile), tmpFile = Some(tmpFile)) | |
| } catch { | |
| case e: Throwable => | |
| // Ensure the temp file is deleted if FileOutputStream creation fails | |
| tmpFile.delete() | |
| throw e | |
| } | |
| } else { | |
| new RequestState(req, bodyBuf = null, fileBuf = null, tmpFile = None) | |
| } |
| val tmpFile = Files.createTempFile("airframe-body-", ".tmp").toFile | ||
| val fos = new FileOutputStream(tmpFile) | ||
| state.bodyBuf.writeTo(fos) | ||
| state.fileBuf = fos | ||
| state.tmpFile = Some(tmpFile) | ||
| state.bodyBuf = null // Release in-memory buffer |
There was a problem hiding this comment.
This block has a potential resource leak. If new FileOutputStream(tmpFile) or state.bodyBuf.writeTo(fos) throws an exception, the created tmpFile and fos stream will not be cleaned up. This should be wrapped in a try-catch block to ensure resources are released on failure.
val tmpFile = Files.createTempFile("airframe-body-", ".tmp").toFile
var fos: FileOutputStream = null
try {
fos = new FileOutputStream(tmpFile)
state.bodyBuf.writeTo(fos)
state.fileBuf = fos
state.tmpFile = Some(tmpFile)
state.bodyBuf = null // Release in-memory buffer
} catch {
case e: Throwable =>
if (fos != null) {
try { fos.close() } catch { case _: Throwable => /* ignore */ }
}
tmpFile.delete()
throw e
}| // Cache whatever has been read so far (do not drain remaining bytes to avoid memory issues) | ||
| parent.setCachedBytes(buffer.toByteArray) | ||
| underlying.close() |
There was a problem hiding this comment.
The underlying input stream may not be closed if buffer.toByteArray throws an exception (e.g., OutOfMemoryError). To ensure the underlying stream is always closed, underlying.close() should be in a finally block.
try {
// Cache whatever has been read so far (do not drain remaining bytes to avoid memory issues)
parent.setCachedBytes(buffer.toByteArray)
} finally {
underlying.close()
}| if (state != null) { | ||
| if (state.fileBuf != null) { | ||
| try { state.fileBuf.close() } | ||
| catch { case _: Exception => } |
There was a problem hiding this comment.
It's good practice to log exceptions that are caught and ignored, even in cleanup code, as this can help debug rare issues. Consider logging the exception at a warn or debug level.
| catch { case _: Exception => } | |
| catch { case e: Exception => warn("Failed to close file buffer in exception handler", e) } |
…tion errors InputStream parameters cannot be serialized by the codec system (Scala 2.12 runtime reflection fails with NoSuchFieldException on ByteArrayInputStream internal fields). Skip them in both HttpLogs and HttpAccessLogWriter. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
… close - Wrap FileOutputStream creation in try-catch, delete temp file on failure - Protect spill-to-file path against FileOutputStream/writeTo exceptions - Ensure TeeInputStream.close always closes underlying stream via try-finally Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces streaming support for large HTTP request bodies in the Netty backend by spilling bodies exceeding a configurable threshold (default 8MB) to temporary files. It adds a new NettyBodyHandler to manage chunked aggregation and an InputStreamMessage type that allows controller methods to receive InputStream parameters. Several critical issues were identified during the review: the TeeInputStream implementation currently buffers the entire stream in memory, which negates the memory efficiency benefits of file-spilling for large bodies. Additionally, temporary files are deleted prematurely in the asynchronous request handling pipeline, and there are potential resource leaks in NettyBodyHandler and InputStreamMessage where file streams are not properly closed during exceptions or materialization.
| private[http] class TeeInputStream(underlying: InputStream, parent: InputStreamMessage) extends InputStream { | ||
| private val buffer = new ByteArrayOutputStream() | ||
|
|
||
| override def read(): Int = { | ||
| val b = underlying.read() | ||
| if (b >= 0) { | ||
| buffer.write(b) | ||
| } else { | ||
| parent.setCachedBytes(buffer.toByteArray) | ||
| } | ||
| b | ||
| } | ||
|
|
||
| override def read(b: Array[Byte], off: Int, len: Int): Int = { | ||
| val n = underlying.read(b, off, len) | ||
| if (n > 0) { | ||
| buffer.write(b, off, n) | ||
| } else if (n < 0) { | ||
| parent.setCachedBytes(buffer.toByteArray) | ||
| } | ||
| n | ||
| } | ||
|
|
||
| override def available(): Int = underlying.available() | ||
|
|
||
| override def close(): Unit = { | ||
| try { | ||
| // Cache whatever has been read so far (do not drain remaining bytes to avoid memory issues) | ||
| parent.setCachedBytes(buffer.toByteArray) | ||
| } finally { | ||
| underlying.close() | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
The TeeInputStream implementation uses a ByteArrayOutputStream to buffer all read bytes in memory. This completely negates the memory efficiency benefits for large request bodies, as the entire body will be held in the heap once the stream is consumed. For large bodies spilled to disk, the framework should avoid mandatory in-memory caching or provide a mechanism to re-read the file from disk if multiple passes are required, rather than buffering the stream in memory.
| } finally { | ||
| // Clean up temp file if body was file-backed | ||
| NettyBodyHandler.cleanupTempFile(ctx) | ||
| // Need to clean up the TLS in case the same thread is reused for the next request | ||
| NettyBackend.clearThreadLocal() | ||
| } |
There was a problem hiding this comment.
The temp file is deleted in the finally block of handleRequest. Since RxRunner.run is used to dispatch the request, the processing may be asynchronous. If the controller or any downstream filter processes the InputStream asynchronously (e.g., offloaded to another thread pool), the temp file will be deleted before the stream is fully read, resulting in an IOException. Cleanup should be deferred until the response is fully sent or the InputStream is closed.
| try { | ||
| val fos = new FileOutputStream(tmpFile) | ||
| state.bodyBuf.writeTo(fos) | ||
| state.fileBuf = fos | ||
| state.tmpFile = Some(tmpFile) | ||
| state.bodyBuf = null // Release in-memory buffer | ||
| } catch { | ||
| case e: Exception => | ||
| tmpFile.delete() | ||
| throw e | ||
| } |
There was a problem hiding this comment.
The FileOutputStream (fos) is not closed if an exception occurs during state.bodyBuf.writeTo(fos). This leads to a file descriptor leak and may prevent the subsequent tmpFile.delete() from succeeding on some file systems (e.g., Windows) where open files cannot be deleted.
try {
val fos = new FileOutputStream(tmpFile)
try {
state.bodyBuf.writeTo(fos)
state.fileBuf = fos
state.tmpFile = Some(tmpFile)
state.bodyBuf = null // Release in-memory buffer
} catch {
case e: Exception =>
fos.close()
throw e
}
} catch {
case e: Exception =>
tmpFile.delete()
throw e
}| private def ensureCached(): Array[Byte] = synchronized { | ||
| if (cachedBytes == null) { | ||
| cachedBytes = inputStream.readAllBytes() | ||
| } | ||
| cachedBytes | ||
| } |
There was a problem hiding this comment.
The ensureCached method reads the entire inputStream into memory but does not close it. This will leak the underlying resource (e.g., a FileInputStream) if toContentBytes or toContentString is called directly on an InputStreamMessage that hasn't been consumed via getInputStream.
private def ensureCached(): Array[Byte] = synchronized {
if (cachedBytes == null) {
try {
cachedBytes = inputStream.readAllBytes()
} finally {
inputStream.close()
}
}
cachedBytes
}
Summary
HttpObjectAggregatorwith a customNettyBodyHandlerthat streams large request bodies to temp files instead of buffering entirely in memoryInputStreamMessage— a JVM-onlyMessageimplementation backed byInputStreamwith lazy caching viaTeeInputStreamjava.io.InputStreamas a handler parameter type, enabling streaming reads of large request bodiesbodyBufferThresholdBytesconfig (default 8MB) toNettyServerConfigfor controlling the in-memory vs file-backed thresholdDesign
Memory-efficient body handling
ByteArrayOutputStreamas before — no disk I/O overheadKey components
InputStreamMessage: Wraps anInputStreamas aMessage. UsesTeeInputStreamto cache bytes as they are read, sotoContentBytesworks even after streaming.NettyBodyHandler: ReplacesHttpObjectAggregator. ProcessesHttpRequest,HttpContent, andLastHttpContentindividually. Decides buffering strategy based onContent-Lengthheader or accumulated size.NettyRequestHandler: Simplified to receive pre-assembledHttpMessage.RequestfromNettyBodyHandler. No longer reads headers/body from Netty objects.Handler usage
Test plan
InputStreamMessageTest: Unit tests fortoContentBytes,toContentString,getInputStream, read-then-cache and cache-then-read semanticsInputStreamEndpointTest: Integration tests with Netty server — small body, large body (>8MB), echo, empty bodyhttpJVMtests pass (199 tests)nettytests pass (121 tests)🤖 Generated with Claude Code