Skip to content

Add InputStream-based message support for large request bodies#4171

Open
xerial wants to merge 4 commits intomainfrom
feature/20260407-inputstream-message
Open

Add InputStream-based message support for large request bodies#4171
xerial wants to merge 4 commits intomainfrom
feature/20260407-inputstream-message

Conversation

@xerial
Copy link
Copy Markdown
Member

@xerial xerial commented Apr 7, 2026

Summary

  • Replace Netty's HttpObjectAggregator with a custom NettyBodyHandler that streams large request bodies to temp files instead of buffering entirely in memory
  • Add InputStreamMessage — a JVM-only Message implementation backed by InputStream with lazy caching via TeeInputStream
  • Support java.io.InputStream as a handler parameter type, enabling streaming reads of large request bodies
  • Add bodyBufferThresholdBytes config (default 8MB) to NettyServerConfig for controlling the in-memory vs file-backed threshold

Design

Memory-efficient body handling

  • Small bodies (≤ threshold): Accumulated in ByteArrayOutputStream as before — no disk I/O overhead
  • Large bodies (> threshold): Written to temp file as chunks arrive. Body is never fully in heap. Temp file is cleaned up after request processing.

Key components

  • InputStreamMessage: Wraps an InputStream as a Message. Uses TeeInputStream to cache bytes as they are read, so toContentBytes works even after streaming.
  • NettyBodyHandler: Replaces HttpObjectAggregator. Processes HttpRequest, HttpContent, and LastHttpContent individually. Decides buffering strategy based on Content-Length header or accumulated size.
  • NettyRequestHandler: Simplified to receive pre-assembled HttpMessage.Request from NettyBodyHandler. No longer reads headers/body from Netty objects.

Handler usage

@Endpoint(method = HttpMethod.POST, path = "/upload")
def upload(body: java.io.InputStream): String = {
  val bytes = body.readAllBytes()
  s"received ${bytes.length} bytes"
}

Test plan

  • InputStreamMessageTest: Unit tests for toContentBytes, toContentString, getInputStream, read-then-cache and cache-then-read semantics
  • InputStreamEndpointTest: Integration tests with Netty server — small body, large body (>8MB), echo, empty body
  • All existing httpJVM tests pass (199 tests)
  • All existing netty tests pass (121 tests)

🤖 Generated with Claude Code

Replace Netty's HttpObjectAggregator with a custom NettyBodyHandler that
streams large request bodies to temp files instead of buffering in memory.
Small bodies (below configurable threshold, default 8MB) remain in-memory
for performance.

- Add InputStreamMessage: a JVM-only Message implementation backed by
  InputStream with lazy caching via TeeInputStream
- Add NettyBodyHandler: processes HTTP chunks individually, spills to
  disk when body exceeds threshold
- Simplify NettyRequestHandler to receive pre-assembled HttpMessage.Request
- Support InputStream as a handler parameter type in HttpRequestMapper
- Add bodyBufferThresholdBytes config to NettyServerConfig

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a NettyBodyHandler to manage HTTP request bodies by buffering small payloads in memory and spilling larger ones to temporary files, which helps reduce heap usage. It also adds support for binding java.io.InputStream to endpoint parameters. The review feedback highlights a critical bug where temporary files are deleted prematurely due to the asynchronous nature of the request processing. Furthermore, improvements are suggested to prevent resource leaks during file creation and to optimize memory usage when writing request chunks to disk.

Comment on lines 148 to 153
} finally {
// Clean up temp file if body was file-backed
NettyBodyHandler.cleanupTempFile(ctx)
// Need to clean up the TLS in case the same thread is reused for the next request
NettyBackend.clearThreadLocal()
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The finally block cleans up the temporary file used for large request bodies. However, RxRunner.run is non-blocking and dispatches the request for asynchronous processing. This means the finally block will execute immediately, deleting the temporary file before the request handler has a chance to read it. This is a critical bug that will cause I/O exceptions for any request with a body large enough to be spooled to disk.

The cleanup logic must be deferred until after the response has been fully sent to the client. A robust way to achieve this is by attaching a listener to the ChannelFuture of the final write operation for the response.

Comment on lines +88 to +93
val state = if (useFile) {
val tmpFile = Files.createTempFile("airframe-body-", ".tmp").toFile
RequestState(req, bodyBuf = null, fileBuf = new FileOutputStream(tmpFile), tmpFile = Some(tmpFile))
} else {
RequestState(req, bodyBuf = null, fileBuf = null, tmpFile = None)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There is a potential resource leak here. Files.createTempFile creates a file on disk. If the subsequent new FileOutputStream(tmpFile) constructor throws an exception (e.g., due to permissions or other I/O errors), the created tmpFile will not be deleted. The cleanup logic in exceptionCaught will not handle this case because the RequestState has not yet been associated with the channel. This can lead to an accumulation of temporary files on the server. To prevent this, you should wrap the resource allocation in a try-catch block and ensure the temporary file is deleted on failure.

    val state = if (useFile) {
      val tmpFile = Files.createTempFile("airframe-body-", ".tmp").toFile
      try {
        RequestState(req, bodyBuf = null, fileBuf = new FileOutputStream(tmpFile), tmpFile = Some(tmpFile))
      } catch {
        case e: Throwable =>
          tmpFile.delete()
          throw e
      }
    } else {
      RequestState(req, bodyBuf = null, fileBuf = null, tmpFile = None)
    }

Comment on lines +128 to +135
if (state.bodyBuf.size() > bodyBufferThresholdBytes) {
val tmpFile = Files.createTempFile("airframe-body-", ".tmp").toFile
val fos = new FileOutputStream(tmpFile)
state.bodyBuf.writeTo(fos)
state.fileBuf = fos
state.tmpFile = Some(tmpFile)
state.bodyBuf = null // Release in-memory buffer
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There is a potential resource leak when spilling the in-memory buffer to a file. If new FileOutputStream(tmpFile) throws an exception, the tmpFile created by Files.createTempFile will be leaked because the exception is not caught here and the cleanup logic will not be aware of this partially-initialized state. You should use a try-catch block to ensure the file is deleted if an error occurs during the spill operation.

          if (state.bodyBuf.size() > bodyBufferThresholdBytes) {
            val tmpFile = Files.createTempFile("airframe-body-", ".tmp").toFile
            try {
              val fos     = new FileOutputStream(tmpFile)
              state.bodyBuf.writeTo(fos)
              state.fileBuf = fos
              state.tmpFile = Some(tmpFile)
              state.bodyBuf = null // Release in-memory buffer
            } catch {
              case e: Throwable =>
                tmpFile.delete()
                throw e
            }
          }

Comment on lines +117 to +119
val bytes = new Array[Byte](size)
buf.readBytes(bytes)
state.fileBuf.write(bytes)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For efficiency, you can avoid allocating an intermediate Array[Byte] for each chunk of data. Netty's ByteBuf can write its content directly to an OutputStream. Using buf.readBytes(state.fileBuf, size) will reduce memory allocations and garbage collection pressure, which is particularly beneficial when handling large file uploads.

Suggested change
val bytes = new Array[Byte](size)
buf.readBytes(bytes)
state.fileBuf.write(bytes)
buf.readBytes(state.fileBuf, size)

- Fix keep-alive regression: respect client's Connection header
  (HTTP/1.1 defaults to keep-alive, close only on explicit "Connection: close")
- Remove redundant @volatile on synchronized field in InputStreamMessage
- Use buf.readBytes(outputStream, size) to avoid per-chunk byte array allocation
- Use HttpUtil.getContentLength for Content-Length header parsing
- Simplify TeeInputStream.close to not drain remaining bytes (avoids memory issues)
- Change RequestState from case class to plain class (mutable state semantics)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@xerial
Copy link
Copy Markdown
Member Author

xerial commented Apr 7, 2026

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces NettyBodyHandler to replace HttpObjectAggregator, enabling the buffering of large HTTP request bodies to temporary files instead of memory. It also adds support for java.io.InputStream as an endpoint parameter, allowing for streaming of large request bodies. Feedback focuses on improving resource management by ensuring temporary files and streams are properly closed or deleted in the event of exceptions during request processing and cleanup.

Comment on lines +88 to +93
val state = if (useFile) {
val tmpFile = Files.createTempFile("airframe-body-", ".tmp").toFile
new RequestState(req, bodyBuf = null, fileBuf = new FileOutputStream(tmpFile), tmpFile = Some(tmpFile))
} else {
new RequestState(req, bodyBuf = null, fileBuf = null, tmpFile = None)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There is a potential resource leak here. If new FileOutputStream(tmpFile) throws an exception, the tmpFile that was just created will not be deleted. To prevent this file leak, you should wrap the resource allocation in a try-catch block to ensure the temporary file is deleted on failure.

Suggested change
val state = if (useFile) {
val tmpFile = Files.createTempFile("airframe-body-", ".tmp").toFile
new RequestState(req, bodyBuf = null, fileBuf = new FileOutputStream(tmpFile), tmpFile = Some(tmpFile))
} else {
new RequestState(req, bodyBuf = null, fileBuf = null, tmpFile = None)
}
val state = if (useFile) {
val tmpFile = Files.createTempFile("airframe-body-", ".tmp").toFile
try {
new RequestState(req, bodyBuf = null, fileBuf = new FileOutputStream(tmpFile), tmpFile = Some(tmpFile))
} catch {
case e: Throwable =>
// Ensure the temp file is deleted if FileOutputStream creation fails
tmpFile.delete()
throw e
}
} else {
new RequestState(req, bodyBuf = null, fileBuf = null, tmpFile = None)
}

Comment on lines +127 to +132
val tmpFile = Files.createTempFile("airframe-body-", ".tmp").toFile
val fos = new FileOutputStream(tmpFile)
state.bodyBuf.writeTo(fos)
state.fileBuf = fos
state.tmpFile = Some(tmpFile)
state.bodyBuf = null // Release in-memory buffer
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This block has a potential resource leak. If new FileOutputStream(tmpFile) or state.bodyBuf.writeTo(fos) throws an exception, the created tmpFile and fos stream will not be cleaned up. This should be wrapped in a try-catch block to ensure resources are released on failure.

            val tmpFile = Files.createTempFile("airframe-body-", ".tmp").toFile
            var fos: FileOutputStream = null
            try {
              fos = new FileOutputStream(tmpFile)
              state.bodyBuf.writeTo(fos)
              state.fileBuf = fos
              state.tmpFile = Some(tmpFile)
              state.bodyBuf = null // Release in-memory buffer
            } catch {
              case e: Throwable =>
                if (fos != null) {
                  try { fos.close() } catch { case _: Throwable => /* ignore */ }
                }
                tmpFile.delete()
                throw e
            }

Comment on lines +92 to +94
// Cache whatever has been read so far (do not drain remaining bytes to avoid memory issues)
parent.setCachedBytes(buffer.toByteArray)
underlying.close()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The underlying input stream may not be closed if buffer.toByteArray throws an exception (e.g., OutOfMemoryError). To ensure the underlying stream is always closed, underlying.close() should be in a finally block.

    try {
      // Cache whatever has been read so far (do not drain remaining bytes to avoid memory issues)
      parent.setCachedBytes(buffer.toByteArray)
    } finally {
      underlying.close()
    }

if (state != null) {
if (state.fileBuf != null) {
try { state.fileBuf.close() }
catch { case _: Exception => }
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

It's good practice to log exceptions that are caught and ignored, even in cleanup code, as this can help debug rare issues. Consider logging the exception at a warn or debug level.

Suggested change
catch { case _: Exception => }
catch { case e: Exception => warn("Failed to close file buffer in exception handler", e) }

xerial and others added 2 commits April 7, 2026 14:06
…tion errors

InputStream parameters cannot be serialized by the codec system (Scala 2.12
runtime reflection fails with NoSuchFieldException on ByteArrayInputStream
internal fields). Skip them in both HttpLogs and HttpAccessLogWriter.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
… close

- Wrap FileOutputStream creation in try-catch, delete temp file on failure
- Protect spill-to-file path against FileOutputStream/writeTo exceptions
- Ensure TeeInputStream.close always closes underlying stream via try-finally

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@xerial
Copy link
Copy Markdown
Member Author

xerial commented Apr 7, 2026

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces streaming support for large HTTP request bodies in the Netty backend by spilling bodies exceeding a configurable threshold (default 8MB) to temporary files. It adds a new NettyBodyHandler to manage chunked aggregation and an InputStreamMessage type that allows controller methods to receive InputStream parameters. Several critical issues were identified during the review: the TeeInputStream implementation currently buffers the entire stream in memory, which negates the memory efficiency benefits of file-spilling for large bodies. Additionally, temporary files are deleted prematurely in the asynchronous request handling pipeline, and there are potential resource leaks in NettyBodyHandler and InputStreamMessage where file streams are not properly closed during exceptions or materialization.

Comment on lines +66 to +99
private[http] class TeeInputStream(underlying: InputStream, parent: InputStreamMessage) extends InputStream {
private val buffer = new ByteArrayOutputStream()

override def read(): Int = {
val b = underlying.read()
if (b >= 0) {
buffer.write(b)
} else {
parent.setCachedBytes(buffer.toByteArray)
}
b
}

override def read(b: Array[Byte], off: Int, len: Int): Int = {
val n = underlying.read(b, off, len)
if (n > 0) {
buffer.write(b, off, n)
} else if (n < 0) {
parent.setCachedBytes(buffer.toByteArray)
}
n
}

override def available(): Int = underlying.available()

override def close(): Unit = {
try {
// Cache whatever has been read so far (do not drain remaining bytes to avoid memory issues)
parent.setCachedBytes(buffer.toByteArray)
} finally {
underlying.close()
}
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The TeeInputStream implementation uses a ByteArrayOutputStream to buffer all read bytes in memory. This completely negates the memory efficiency benefits for large request bodies, as the entire body will be held in the heap once the stream is consumed. For large bodies spilled to disk, the framework should avoid mandatory in-memory caching or provide a mechanism to re-read the file from disk if multiple passes are required, rather than buffering the stream in memory.

Comment on lines 153 to 158
} finally {
// Clean up temp file if body was file-backed
NettyBodyHandler.cleanupTempFile(ctx)
// Need to clean up the TLS in case the same thread is reused for the next request
NettyBackend.clearThreadLocal()
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The temp file is deleted in the finally block of handleRequest. Since RxRunner.run is used to dispatch the request, the processing may be asynchronous. If the controller or any downstream filter processes the InputStream asynchronously (e.g., offloaded to another thread pool), the temp file will be deleted before the stream is fully read, resulting in an IOException. Cleanup should be deferred until the response is fully sent or the InputStream is closed.

Comment on lines +134 to +144
try {
val fos = new FileOutputStream(tmpFile)
state.bodyBuf.writeTo(fos)
state.fileBuf = fos
state.tmpFile = Some(tmpFile)
state.bodyBuf = null // Release in-memory buffer
} catch {
case e: Exception =>
tmpFile.delete()
throw e
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The FileOutputStream (fos) is not closed if an exception occurs during state.bodyBuf.writeTo(fos). This leads to a file descriptor leak and may prevent the subsequent tmpFile.delete() from succeeding on some file systems (e.g., Windows) where open files cannot be deleted.

            try {
              val fos = new FileOutputStream(tmpFile)
              try {
                state.bodyBuf.writeTo(fos)
                state.fileBuf = fos
                state.tmpFile = Some(tmpFile)
                state.bodyBuf = null // Release in-memory buffer
              } catch {
                case e: Exception =>
                  fos.close()
                  throw e
              }
            } catch {
              case e: Exception =>
                tmpFile.delete()
                throw e
            }

Comment on lines +30 to +35
private def ensureCached(): Array[Byte] = synchronized {
if (cachedBytes == null) {
cachedBytes = inputStream.readAllBytes()
}
cachedBytes
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The ensureCached method reads the entire inputStream into memory but does not close it. This will leak the underlying resource (e.g., a FileInputStream) if toContentBytes or toContentString is called directly on an InputStreamMessage that hasn't been consumed via getInputStream.

  private def ensureCached(): Array[Byte] = synchronized {
    if (cachedBytes == null) {
      try {
        cachedBytes = inputStream.readAllBytes()
      } finally {
        inputStream.close()
      }
    }
    cachedBytes
  }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant