Skip to content

feat: release v0.8.0 — API expansion + Node interop#11

Merged
sgmonda merged 16 commits into
mainfrom
feat/v0.8.0-api
May 19, 2026
Merged

feat: release v0.8.0 — API expansion + Node interop#11
sgmonda merged 16 commits into
mainfrom
feat/v0.8.0-api

Conversation

@sgmonda

@sgmonda sgmonda commented May 19, 2026

Copy link
Copy Markdown
Owner

Summary

Companion release to v0.7.2, expanding the API to cover the gaps that surfaced while integrating streamfu into a real NestJS+TypeORM codebase (TwenixPlatform/platform-back#5907). Result: that PR can drop most of its custom wrappers.

New operators

  • tap(stream, fn) — side-effect, forward chunks unchanged
  • count(stream) — chunk count consumer
  • take(stream, n) / drop(stream, n) — array-style aliases
  • batch(stream, size) — group chunks into arrays
  • merge(...streams) — interleave by arrival time
  • lines(stream) / csvLines(stream) — text/CSV line splitters
  • toBuffer(stream) — concatenate to Uint8Array
  • iterate(producer) — generator with null sentinel

Changed

  • filter(stream, fn) now passes the chunk index — (chunk, i) => boolean. Backwards compatible.
  • pipe(stream, ...steps) now accepts TransformStream instances mixed with functions.
  • slice() rewritten with absolute indices; fixes a latent edge case (slice(s, 0, 0) used to return one chunk instead of []).

Packaging

  • Per-module subpath exports for both JSR and npm (@sgmonda/streamfu/map, etc.). Helps moduleResolution: node10 consumers without tree-shaking.
  • Verified locally: subpaths work in both ESM and CJS Node consumers.

Removed

  • Node 16 support. system/stream.ts switched from require("node:stream/web") to globalThis.{ReadableStream,WritableStream,TransformStream}. The previous build embedded a literal require that the ESM emit could not load — Node ESM consumers now work.

Tests

  • Property-based equivalence (fast-check) for map, filter, reduce, concat, take, drop, count vs Array.prototype.
  • Node integration test that streams a 50k-line NDJSON file through Readable.toWeb + lines + map + tap + filter.

Test plan

  • deno task test green (62 tests, 217 steps)
  • examples/node green on npm + yarn + pnpm (7 suites, 14 tests)
  • examples/bun green
  • examples/cloudflare-workers green
  • Property-based tests green (8 properties, default 100 runs each)
  • Subpath import test in ESM + CJS Node consumer (manual)
  • Smoke test in platform-back after publish: drop wrappers per PLAN.md Fase 6

🤖 Generated with Claude Code

sgmonda and others added 16 commits May 19, 2026 08:39
Aligns filter() with map(), forEach() and reduce() which already pass the
chunk index. Enables predicates like `(chunk, i) => i % 2 === 0` without
needing an external counter.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Equivalent to reduce(stream, n => n + 1, 0) but reads better at call
sites. Respects backpressure because it delegates to reduce.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
tap(stream, fn) runs fn(chunk, i) for every chunk and forwards the chunk
unchanged. Useful for logging, metrics, or external collection without
breaking the pipeline. Async side-effects are awaited before forwarding
the next chunk, and thrown errors propagate to the consumer.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- take(stream, n): keep the first n chunks. Alias for slice(s, 0, n).
- drop(stream, n): skip the first n chunks. Alias for slice(s, n).

While wiring take(s, 0), slice() was found to return [chunk[0]] instead
of []: with end <= start it still enqueued the first chunk before
terminating. Reimplemented slice with absolute indices and an early
terminate-at-start guard. Three new edge cases added: end===start at
position 0, end===start mid-stream, end < start.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Groups consecutive chunks into arrays of `size` elements. The final batch
may be smaller. Common use case: amortizing per-chunk overhead (e.g.
inserting rows 1000 at a time into a database).

Validates size at the boundary: throws RangeError for non-positive or
non-integer values.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
merge() reads all sources in parallel and emits chunks as they arrive,
unlike concat() (which drains sequentially) or zip() (which aligns
positionally). Errors from any source propagate; cancel propagates to
every source reader.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Each step can now be either a function (s => ReadableStream) or a
TransformStream instance — interleaved freely. Detection is duck-typed
('readable' + 'writable') so cross-runtime TransformStream subclasses
work in Node/Deno/Bun without an instanceof tie to a specific binding.

Removes the need for downstream wrappers like:
  const safeStream = (s, ...ts) =>
    pipe(s, ...ts.map(t => r => r.pipeThrough(t)))

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Consumes a stream of string|Uint8Array chunks into a single Uint8Array
with UTF-8 encoding for strings. Returns a runtime-neutral Uint8Array;
Node consumers can use the result directly as a Buffer (Buffer extends
Uint8Array) or call Buffer.from(result) explicitly.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- lines(stream): splits chunks at \n, strips \r, drops the trailing empty
  line, decodes Uint8Array chunks as UTF-8.
- csvLines(stream): same but preserves \n inside double-quoted fields per
  RFC 4180. Supports escaped quotes ("").

Both maintain a scanFrom cursor so re-scanning previously seen text is
avoided, keeping behaviour correct when quotes span chunk boundaries and
keeping total work O(n).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
iterate((i) => T | null) produces a stream by calling the function with
an increasing index until it returns null. Async producers are supported.
Only `null` terminates — `0`, empty strings, and `undefined` are valid
chunk values.

Common use cases: paginated APIs (stop when no more pages), polling, and
any generator-like sequence where the stop condition is dynamic.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds API entries for iterate, tap, batch, take, drop, merge, lines,
csvLines, count, toBuffer. Adds a "Working with Node.js streams" section
covering Readable.toWeb() bridging, an end-to-end CSV ingest example,
and the safeQueryRunner pattern for TypeORM-backed streams.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Every public module is now reachable as @sgmonda/streamfu/<name> in
both JSR (deno.json exports map) and npm (dnt entryPoints + postBuild
patching that wires up `types` for each subpath). Lets Node 10 / CJS
consumers without tree-shaking import only what they use.

While verifying, found the ESM build crashed on import when the platform
detection routed to the Node branch — that branch used a literal
require("node:stream/web") which the ESM emit kept as-is. Replaced with
globalThis.{ReadableStream,WritableStream,TransformStream}; Node 18+
exposes them globally and that bound is shared with node:stream/web's
classes (so instanceof checks still hold). Node <18 is no longer
supported, in line with their EOL.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Uses fast-check to assert that the stream variants of map, filter,
reduce, concat, take, drop and the count consumer are pointwise
equivalent to their Array.prototype counterparts over randomly
generated inputs. Adds a guard against subtle reimplementation
regressions that line coverage alone won't catch.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Generates a 50k-line NDJSON file in tmp, streams it through
Readable.toWeb + pipe + lines + map(JSON.parse) + tap + filter, and
asserts both total and filtered counts. Exercises the documented Node
interop pattern end to end and guards against regressions in lines,
tap, and the platform stream import.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@sgmonda sgmonda merged commit ccf1215 into main May 19, 2026
1 check passed
@sgmonda sgmonda deleted the feat/v0.8.0-api branch May 19, 2026 07:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant