Skip to content

Commit fb37156

Browse files
authored
Add compression support. (#63)
* feat: Add decompression support for decode. * feat: Add compressed encoding. * docs: Update tableFromIPC docs. * Update src/encode/table-to-ipc.js
1 parent def2a0b commit fb37156

20 files changed

+443
-52
lines changed

docs/api/index.md

Lines changed: 80 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,22 @@ title: API Reference
1313
* [columnFromArray](#columnFromArray)
1414
* [columnFromValues](#columnFromValues)
1515
* [tableFromColumns](#tableFromColumns)
16+
* [setCompressionCodec](#setCompressionCodec)
1617

1718
<hr/><a id="tableFromIPC" href="#tableFromIPC">#</a>
1819
<b>tableFromIPC</b>(<i>data</i>[, <i>options</i>])
1920

2021
Decode [Apache Arrow IPC data](https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc) and return a new [`Table`](table). The input binary data may be either an `ArrayBuffer` or `Uint8Array`. For Arrow data in the [IPC 'stream' format](https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format), an array of `Uint8Array` values is also supported.
2122

23+
By default Flechette assumes input data is uncompressed. If input IPC data contains compressed buffers, an appropriate compression codec (for `CompressionType.LZ4_FRAME` or `CompressionType.ZSTD`) must be registered ahead of time using the [`setCompressionCodec`](#setCompressionCodec) method. Otherwise, an error is thrown.
24+
2225
* *data* (`ArrayBuffer` \| `Uint8Array` \| `Uint8Array[]`): The source byte buffer, or an array of buffers. If an array, each byte array may contain one or more self-contained messages. Messages may NOT span multiple byte arrays.
2326
* *options* (`ExtractionOptions`): Options for controlling how values are transformed when extracted from an Arrow binary representation.
24-
* *useBigInt* (`boolean`): If true, extract 64-bit integers as JavaScript `BigInt` values. Otherwise, coerce long integers to JavaScript number values (default `false`).
27+
* *useBigInt* (`boolean`): If true, extract 64-bit integers as JavaScript `BigInt` values. Otherwise, coerce long integers to JavaScript number values (default `false`), raising an error if the integer can not be represented as a double precision floating point number.
2528
* *useDate* (`boolean`): If true, extract dates and timestamps as JavaScript `Date` objects. Otherwise, return numerical timestamp values (default `false`).
26-
* *useDecimalInt* (`boolean`): If true, extract decimal-type data as scaled integer values, where fractional digits are scaled to integer positions. Returned integers are `BigInt` values for decimal bit widths of 64 bits or higher and 32-bit integers (as JavaScript `number`) otherwise. If false, decimals are converted to floating-point numbers (default).
29+
* *useDecimalInt* (`boolean`): If true, extract decimal-type data as scaled integer values, where fractional digits are scaled to integer positions. Returned integers are `BigInt` values for decimal bit widths of 64 bits or higher and 32-bit integers (as JavaScript `number`) otherwise. If false, decimals are lossily converted to floating-point numbers (default).
2730
* *useMap* (`boolean`): If true, extract Arrow 'Map' values as JavaScript `Map` instances. Otherwise, return an array of [key, value] pairs compatible with both `Map` and `Object.fromEntries` (default `false`).
28-
* *useProxy* (`boolean`): If true, extract Arrow 'Struct' values and table row objects using zero-copy proxy objects that extract data from underlying Arrow batches. The proxy objects can improve performance and reduce memory usage, but do not support property enumeration (`Object.keys`, `Object.values`, `Object.entries`) or spreading (`{ ...object }`). Otherwise, use standard JS objects for structs and table rows (default `false`).
31+
* *useProxy* (`boolean`): If true, extract Arrow 'Struct' values and table row objects using zero-copy proxy objects that extract data from underlying Arrow batches. The proxy objects can improve performance and reduce memory usage, but do not support property enumeration (`Object.keys`, `Object.values`, `Object.entries`) or spreading (`{ ...object }`). Otherwise (default `false`), use standard JS objects for structs and table rows.
2932

3033
```js
3134
import { tableFromIPC } from '@uwdata/flechette';
@@ -37,15 +40,50 @@ const table = tableFromIPC(ipc);
3740
<hr/><a id="tableToIPC" href="#tableToIPC">#</a>
3841
<b>tableToIPC</b>(<i>table</i>[, <i>options</i>])
3942

40-
Encode an Arrow table into Arrow IPC binary format and return the result as a `Uint8Array`. Both the IPC ['stream'](https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format) and ['file'](https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format) formats are supported.
43+
Encode an Arrow table into Arrow IPC binary format and return the result as a `Uint8Array`. Both the IPC ['stream'](https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format) and ['file'](https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format) formats are supported. By default Flechette encodes uncompressed data. To perform compression, register a compression codec plugin using [`setCompressionCodec`](#setCompressionCodec) and then set the *codec* option.
4144

4245
* *table* (`Table`): The Arrow table to encode.
4346
* *options* (`object`): Encoding options object.
44-
* *format* (`string`): Arrow `'stream'` (the default) or `'file'` format.
47+
* *format* (`string`): Arrow `'stream'` (the default) or `'file'` (a.k.a. [Feather V2](https://arrow.apache.org/docs/python/feather.html)) format.
48+
* *codec* (`number`): The compression codec type to apply. By default no compression is applied. If specified, this option must be one of the `CompressionType` values, and a corresponding codec plugin must already be registered using the [`setCompressionCodec`](#setCompressionCodec) method.
4549

4650
```js
4751
import { tableToIPC } from '@uwdata/flechette';
48-
const bytes = tableFromIPC(table, { format: 'stream' });
52+
const bytes = tableToIPC(table, { format: 'stream' });
53+
```
54+
55+
```js
56+
import { CompressionType, setCompressionCodec, tableToIPC } from '@uwdata/flechette';
57+
import { ZstdCodec } from 'zstd-codec';
58+
59+
// register zstd compression codec
60+
await new Promise((resolve) => {
61+
ZstdCodec.run((zstd) => {
62+
const codec = new zstd.Simple();
63+
setCompressionCodec(CompressionType.ZSTD, {
64+
encode: (data) => codec.compress(data),
65+
decode: (data) => codec.decompress(data)
66+
});
67+
resolve();
68+
});
69+
});
70+
71+
// generate bytes in IPC stream format, with zstd compression of buffers
72+
const bytes = tableToIPC(table, { codec: CompressionType.ZSTD });
73+
```
74+
75+
```js
76+
import { CompressionType, setCompressionCodec, tableToIPC } from '@uwdata/flechette';
77+
import * as lz4 from 'lz4js';
78+
79+
// register lz4_frame compression codec
80+
setCompressionCodec(CompressionType.LZ4_FRAME, {
81+
encode: (data) => lz4.compress(data),
82+
decode: (data) => lz4.decompress(data)
83+
});
84+
85+
// generate bytes in IPC stream format, with lz4_frame compression of buffers
86+
const bytes = tableToIPC(table, { codec: CompressionType.LZ4_FRAME });
4987
```
5088

5189
<hr/><a id="tableFromArrays" href="#tableFromArrays">#</a>
@@ -166,3 +204,39 @@ const table = tableFromColumns({
166204
floats: columnFromArray([1.1, 2.2, 3.3, 4.4, 5.5])
167205
});
168206
```
207+
208+
<hr/><a id="setCompressionCodec" href="#setCompressionCodec">#</a>
209+
<b>setCompressionCodec</b>(<i>type</i>, <i>codec</i>)
210+
211+
Register a compression codec for compressing or decompressing Arrow bufferdata. Flechette does not include compression codecs by default, but can be extended via plugins to handle compressed data. If an appropriate codec implementation is not registered, an error is thrown when attempting to compress or decompress data.
212+
213+
* *type* (`number`): The codec type id, one of the values of the `CompressionType` object.
214+
* *codec* (`object`): The codec implementation as an object with `encode` (to compress) and `decode` (to decompress) functions, each of which take a `Uint8Array` as input and return a `Uint8Array` as output.
215+
216+
```js
217+
import { CompressionType, setCompressionCodec } from '@uwdata/flechette';
218+
import { ZstdCodec } from 'zstd-codec';
219+
220+
// register zstd compression codec
221+
await new Promise((resolve) => {
222+
ZstdCodec.run((zstd) => {
223+
const codec = new zstd.Simple();
224+
setCompressionCodec(CompressionType.ZSTD, {
225+
encode: (data) => codec.compress(data),
226+
decode: (data) => codec.decompress(data)
227+
});
228+
resolve();
229+
});
230+
});
231+
```
232+
233+
```js
234+
import { CompressionType, setCompressionCodec } from '@uwdata/flechette';
235+
import * as lz4 from 'lz4js';
236+
237+
// register lz4_frame compression codec
238+
setCompressionCodec(CompressionType.LZ4_FRAME, {
239+
encode: (data) => lz4.compress(data),
240+
decode: (data) => lz4.decompress(data)
241+
});
242+
```

package-lock.json

Lines changed: 22 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,12 @@
3737
"@rollup/plugin-terser": "^0.4.4",
3838
"apache-arrow": "^21.1.0",
3939
"eslint": "^9.39.2",
40+
"lz4js": "0.2.0",
4041
"rimraf": "^6.1.2",
4142
"rollup": "^4.56.0",
4243
"rollup-plugin-bundle-size": "^1.0.3",
4344
"typescript": "^5.9.3",
44-
"vitest": "^4.0.18"
45+
"vitest": "^4.0.18",
46+
"zstd-codec": "0.1.5"
4547
}
4648
}

src/compression.js

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/** @import { Codec, CompressionType_ } from './types.js' */
2+
import { CompressionType } from './constants.js';
3+
import { writeInt64 } from './encode/builder.js';
4+
import { keyFor } from './util/objects.js';
5+
import { readInt64 } from './util/read.js';
6+
7+
const LENGTH_NO_COMPRESSED_DATA = -1;
8+
const COMPRESS_LENGTH_PREFIX = 8;
9+
10+
/**
11+
* Return an error message for a missing codec.
12+
* @param {CompressionType_} type The codec type.
13+
*/
14+
export function missingCodec(type) {
15+
return `Missing compression codec "${keyFor(CompressionType, type)}" (id ${type})`;
16+
}
17+
18+
/** @type {Map<CompressionType_, Codec>} */
19+
const codecs = new Map;
20+
21+
/**
22+
* Register a codec to use for compressing or decompressing Arrow buffers.
23+
* @param {CompressionType_} type The compression type.
24+
* @param {Codec} codec The codec implementation.
25+
*/
26+
export function setCompressionCodec(type, codec) {
27+
codecs.set(type, codec);
28+
}
29+
30+
/**
31+
* Returns a compression codec for the provided type, or null if not found.
32+
* Compression codecs must first be registered using *setCompressionCodec*.
33+
* @param {CompressionType_ | null} [type] The compression type.
34+
* @returns {Codec | null} The compression codec, or null if not registered.
35+
*/
36+
export function getCompressionCodec(type) {
37+
return (type != null && codecs.get(type)) || null;
38+
}
39+
40+
/**
41+
* Decompress an Arrow buffer, return decompressed bytes and region metadata.
42+
* @param {Uint8Array} body The message body.
43+
* @param {{ offset: number, length: number }} region Buffer region metadata.
44+
* @param {Codec} codec A compression codec.
45+
* @returns {{ bytes: Uint8Array, offset: number, length: number }}
46+
*/
47+
export function decompressBuffer(body, { offset, length }, codec) {
48+
if (length === 0) {
49+
return { bytes: new Uint8Array(0), offset: 0, length: 0 };
50+
}
51+
const ulen = readInt64(body, offset); // uncompressed length
52+
const buf = body.subarray(offset + COMPRESS_LENGTH_PREFIX, offset + length);
53+
const bytes = (ulen === LENGTH_NO_COMPRESSED_DATA) ? buf : codec.decode(buf);
54+
return { bytes, offset: 0, length: bytes.length };
55+
}
56+
57+
/**
58+
* Compress an Arrow buffer, return encoded bytes. If the compression does
59+
* not decrease the overall length, retains uncompressed bytes.
60+
* @param {Uint8Array} bytes The byte buffer to compress.
61+
* @param {Codec} codec A compression codec.
62+
*/
63+
export function compressBuffer(bytes, codec) {
64+
const compressed = codec.encode(bytes);
65+
const keep = compressed.length < bytes.length;
66+
const data = keep ? compressed : bytes;
67+
const buf = new Uint8Array(COMPRESS_LENGTH_PREFIX + data.length);
68+
writeInt64(buf, 0, keep ? bytes.length : LENGTH_NO_COMPRESSED_DATA);
69+
buf.set(data, COMPRESS_LENGTH_PREFIX);
70+
return buf;
71+
}

src/constants.js

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,3 +374,35 @@ export const UnionMode = /** @type {const} */ ({
374374
/** Dense union layout with offsets into value arrays. */
375375
Dense: 1
376376
});
377+
378+
/**
379+
* Compression types.
380+
*/
381+
export const CompressionType = /** @type {const} */ ({
382+
/**
383+
* LZ4 frame compression.
384+
* Not to be confused with "raw" (also called "block") format.
385+
*/
386+
LZ4_FRAME: 0,
387+
/** Zstandard compression. */
388+
ZSTD: 1
389+
});
390+
391+
/**
392+
* Body compression methods.
393+
* Provided for forward compatibility in case Arrow needs to support
394+
* different strategies for compressing the IPC message body (like
395+
* whole-body compression rather than buffer-level) in the future.
396+
*/
397+
export const BodyCompressionMethod = /** @type {const} */ ({
398+
/**
399+
* Each constituent buffer is first compressed with the indicated
400+
* compressor, and then written with the uncompressed length in the first 8
401+
* bytes as a 64-bit little-endian signed integer followed by the compressed
402+
* buffer bytes (and then padding as required by the protocol). The
403+
* uncompressed length may be set to -1 to indicate that the data that
404+
* follows is not compressed, which can be useful for cases where
405+
* compression does not yield appreciable savings.
406+
*/
407+
BUFFER: 0
408+
});

src/decode/body-compression.js

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/**
2+
* @import { BodyCompression, BodyCompressionMethod_, CompressionType_ } from '../types.js'
3+
*/
4+
import { BodyCompressionMethod, CompressionType } from '../constants.js';
5+
import { readInt8, readObject } from '../util/read.js';
6+
7+
/**
8+
* Decode record batch body compression metadata.
9+
* @param {Uint8Array} buf A byte buffer of binary Arrow IPC data
10+
* @param {number} index The starting index in the byte buffer
11+
* @returns {BodyCompression | undefined} The body compression metadata
12+
*/
13+
export function decodeBodyCompression(buf, index) {
14+
// 4: codec
15+
// 6: method
16+
const get = readObject(buf, index);
17+
return {
18+
codec: /** @type {CompressionType_} */(
19+
get(4, readInt8, CompressionType.LZ4_FRAME)),
20+
method: /** @type {BodyCompressionMethod_} */(
21+
get(6, readInt8, BodyCompressionMethod.BUFFER))
22+
};
23+
}

src/decode/record-batch.js

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
*/
44
import { Version } from '../constants.js';
55
import { readInt64, readObject, readOffset, readVector } from '../util/read.js';
6+
import { decodeBodyCompression } from './body-compression.js';
67

78
/**
89
* Decode a record batch.
@@ -15,12 +16,9 @@ export function decodeRecordBatch(buf, index, version) {
1516
// 4: length
1617
// 6: nodes
1718
// 8: buffers
18-
// 10: compression (not supported)
19+
// 10: compression (requires codec plug-in)
1920
// 12: variadicBuffers (buffer counts for view-typed fields)
2021
const get = readObject(buf, index);
21-
if (get(10, readOffset, 0)) {
22-
throw new Error('Record batch compression not implemented');
23-
}
2422

2523
// If an Arrow buffer was written before version 4,
2624
// advance 8 bytes to skip the now-removed page_id field
@@ -36,6 +34,7 @@ export function decodeRecordBatch(buf, index, version) {
3634
offset: readInt64(buf, pos + offset),
3735
length: readInt64(buf, pos + offset + 8)
3836
})),
37+
compression: get(10, decodeBodyCompression),
3938
variadic: readVector(buf, get(12, readOffset), 8, readInt64)
4039
};
4140
}

0 commit comments

Comments
 (0)