Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/cubejs-duckdb-driver/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
"@cubejs-backend/base-driver": "1.6.24",
"@cubejs-backend/schema-compiler": "1.6.24",
"@cubejs-backend/shared": "1.6.24",
"duckdb": "^1.4.1"
"@duckdb/node-api": "^1.5.0-r.1"
},
"license": "Apache-2.0",
"devDependencies": {
Expand Down
89 changes: 46 additions & 43 deletions packages/cubejs-duckdb-driver/src/DuckDBDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,13 @@ import {
QueryOptions,
StreamTableData,
GenericDataBaseType,
TableStructure,
TableColumnQueryResult,
} from '@cubejs-backend/base-driver';
import { getEnv } from '@cubejs-backend/shared';
import { promisify } from 'util';
import * as stream from 'stream';
import { Connection, Database } from 'duckdb';
import { DuckDBInstance, DuckDBConnection, DuckDBValue } from '@duckdb/node-api';

import { DuckDBQuery } from './DuckDBQuery';
import { HydrationStream, transformRow } from './HydrationStream';
import { cubeValueConverter } from './HydrationStream';

const { version } = require('../../package.json');

Expand All @@ -28,14 +25,13 @@ export type DuckDBDriverConfiguration = {
};

type InitPromise = {
defaultConnection: Connection,
db: Database;
defaultConnection: DuckDBConnection,
db: DuckDBInstance;
};

const DuckDBToGenericType: Record<string, GenericDataBaseType> = {
// DATE_TRUNC returns DATE, but Cube Store still doesn't support DATE type
// DuckDB's driver transform date/timestamp to Date object, but HydrationStream converts any Date object to ISO timestamp
// That's why It's safe to use timestamp here
// DATE_TRUNC returns DATE, but Cube Store doesn't support DATE type
// The cubeValueConverter transforms date/timestamp to ISO timestamp strings
date: 'timestamp',
};

Expand Down Expand Up @@ -63,11 +59,11 @@ export class DuckDBDriver extends BaseDriver implements DriverInterface {
return DuckDBToGenericType[columnType.toLowerCase()] || super.toGenericType(columnType.toLowerCase(), precision, scale);
}

private async installExtensions(extensions: string[], execAsync: (sql: string, ...params: any[]) => Promise<void>, repository: string = ''): Promise<void> {
private async installExtensions(extensions: string[], connection: DuckDBConnection, repository: string = ''): Promise<void> {
repository = repository ? ` FROM ${repository}` : '';
for (const extension of extensions) {
try {
await execAsync(`INSTALL ${extension}${repository}`);
await connection.run(`INSTALL ${extension}${repository}`);
} catch (e) {
if (this.logger) {
console.error(`DuckDB - error on installing ${extension}`, { e });
Expand All @@ -78,10 +74,10 @@ export class DuckDBDriver extends BaseDriver implements DriverInterface {
}
}

private async loadExtensions(extensions: string[], execAsync: (sql: string, ...params: any[]) => Promise<void>): Promise<void> {
private async loadExtensions(extensions: string[], connection: DuckDBConnection): Promise<void> {
for (const extension of extensions) {
try {
await execAsync(`LOAD ${extension}`);
await connection.run(`LOAD ${extension}`);
} catch (e) {
if (this.logger) {
console.error(`DuckDB - error on loading ${extension}`, { e });
Expand All @@ -105,17 +101,16 @@ export class DuckDBDriver extends BaseDriver implements DriverInterface {
dbUrl = ':memory:';
}

let dbOptions;
const dbOptions: any = {};
if (token) {
dbOptions = { custom_user_agent: `Cube/${version}` };
dbOptions.custom_user_agent = `Cube/${version}`;
}

// Create a new Database instance with the determined URL and custom user agent
const db = new Database(dbUrl, dbOptions);
const db = await DuckDBInstance.create(dbUrl, dbOptions);

// Under the hood all methods of Database uses internal default connection, but there is no way to expose it
const defaultConnection = db.connect();
const execAsync: (sql: string, ...params: any[]) => Promise<void> = promisify(defaultConnection.exec).bind(defaultConnection) as any;
// Create a default connection for configuration and queries
const defaultConnection = await db.connect();

const configuration = [
{
Expand Down Expand Up @@ -159,7 +154,7 @@ export class DuckDBDriver extends BaseDriver implements DriverInterface {
for (const { key, value } of configuration) {
if (value) {
try {
await execAsync(`SET ${key}='${value}'`);
await defaultConnection.run(`SET ${key}='${value}'`);
} catch (e) {
if (this.logger) {
console.error(`DuckDB - error on configuration, key: ${key}`, {
Expand All @@ -173,7 +168,7 @@ export class DuckDBDriver extends BaseDriver implements DriverInterface {
const useCredentialChain = this.config.duckdbS3UseCredentialChain || getEnv('duckdbS3UseCredentialChain', this.config);
if (useCredentialChain) {
try {
await execAsync('CREATE SECRET (TYPE S3, PROVIDER \'CREDENTIAL_CHAIN\')');
await defaultConnection.run('CREATE SECRET (TYPE S3, PROVIDER \'CREDENTIAL_CHAIN\')');
} catch (e) {
if (this.logger) {
console.error('DuckDB - error on creating S3 credential chain secret', { e });
Expand All @@ -184,16 +179,16 @@ export class DuckDBDriver extends BaseDriver implements DriverInterface {

// Install & load extensions if configured in env variable.
const officialExtensions = getEnv('duckdbExtensions', this.config);
await this.installExtensions(officialExtensions, execAsync);
await this.loadExtensions(officialExtensions, execAsync);
await this.installExtensions(officialExtensions, defaultConnection);
await this.loadExtensions(officialExtensions, defaultConnection);
const communityExtensions = getEnv('duckdbCommunityExtensions', this.config);
// @see https://duckdb.org/community_extensions/
await this.installExtensions(communityExtensions, execAsync, 'community');
await this.loadExtensions(communityExtensions, execAsync);
await this.installExtensions(communityExtensions, defaultConnection, 'community');
await this.loadExtensions(communityExtensions, defaultConnection);

if (this.config.initSql) {
try {
await execAsync(this.config.initSql);
await defaultConnection.run(this.config.initSql);
} catch (e) {
if (this.logger) {
console.error('DuckDB - error on init sql (skipping)', {
Expand Down Expand Up @@ -249,14 +244,10 @@ export class DuckDBDriver extends BaseDriver implements DriverInterface {

public async query<R = unknown>(query: string, values: unknown[] = [], _options?: QueryOptions): Promise<R[]> {
const { defaultConnection } = await this.getInitiatedState();
const fetchAsync: (sql: string, ...params: any[]) => Promise<R[]> = promisify(defaultConnection.all).bind(defaultConnection) as any;

const result = await fetchAsync(query, ...values);
return result.map((item) => {
transformRow(item);

return item;
});
const reader = await defaultConnection.runAndReadAll(query, values as DuckDBValue[]);
// Use custom converter to get Cube-friendly string values
return reader.convertRowObjects(cubeValueConverter) as R[];
}

public async stream(
Expand All @@ -269,21 +260,30 @@ export class DuckDBDriver extends BaseDriver implements DriverInterface {
// new connection, because stream can break with
// Attempting to execute an unsuccessful or closed pending query result
// PreAggregation queue has a concurrency limit, it's why pool is not needed here
const connection = db.connect();
const closeAsync = promisify(connection.close).bind(connection);
const connection = await db.connect();

try {
const asyncIterator = connection.stream(query, ...(values || []));
const rowStream = stream.Readable.from(asyncIterator, { highWaterMark }).pipe(new HydrationStream());
const result = await connection.stream(query, values as DuckDBValue[]);

// yieldConvertedRowObjects yields chunks of rows, so we need to flatten them
const flattenRows = async function* flattenRows() {
for await (const chunk of result.yieldConvertedRowObjects(cubeValueConverter)) {
for (const row of chunk) {
yield row;
}
}
};

const rowStream = stream.Readable.from(flattenRows(), { highWaterMark, objectMode: true });

return {
rowStream,
release: async () => {
await closeAsync();
connection.closeSync();
}
};
} catch (e) {
await closeAsync();
connection.closeSync();

throw e;
}
Expand All @@ -299,11 +299,14 @@ export class DuckDBDriver extends BaseDriver implements DriverInterface {

public async release(): Promise<void> {
if (this.initPromise) {
const { db } = await this.initPromise;
const close = promisify(db.close).bind(db);
const { defaultConnection } = await this.initPromise;
this.initPromise = null;

await close();
// Close the default connection
defaultConnection.closeSync();

// Note: DuckDBInstance is auto-managed and doesn't have an explicit close method
// The instance will be cleaned up automatically when all connections are closed
}
}
}
92 changes: 69 additions & 23 deletions packages/cubejs-duckdb-driver/src/HydrationStream.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,71 @@
import stream, { TransformCallback } from 'stream';

export function transformRow(row: any) {
for (const [field, value] of Object.entries(row)) {
if (typeof value === 'number' || typeof value === 'bigint') {
row[field] = value.toString();
} else if (Object.prototype.toString.call(value) === '[object Date]') {
row[field] = (value as any).toISOString();
}
import type { DuckDBValue, DuckDBType, DuckDBValueConverter } from '@duckdb/node-api';

/**
* Custom DuckDB value converter that converts all values to Cube-friendly formats:
* - Dates/timestamps → ISO 8601 strings
* - Decimals → strings with trailing zeros trimmed
* - Bigints → strings
* - Numbers → strings (for consistency)
*/
export const cubeValueConverter: DuckDBValueConverter<string | null> = (value: DuckDBValue, _type: DuckDBType) => {
if (value === null || value === undefined) {
return null;
}

// Handle bigint
if (typeof value === 'bigint') {
return value.toString();
}

// Handle number
if (typeof value === 'number') {
return value.toString();
}

// Handle Date objects (from timestamps/dates)
if (value instanceof Date) {
return value.toISOString();
}

// Handle DuckDB value objects
const valueObj = value as any;
const constructorName = valueObj?.constructor?.name;

if (constructorName === 'DuckDBTimestampValue' && 'micros' in valueObj) {
// Convert microseconds since epoch to ISO string
const micros = BigInt(valueObj.micros);
const millis = Number(micros / 1000n);
return new Date(millis).toISOString();
}
}

export class HydrationStream extends stream.Transform {
public constructor() {
super({
objectMode: true,
transform(row: any, encoding: BufferEncoding, callback: TransformCallback) {
transformRow(row);

this.push(row);
callback();
}
});

if (constructorName === 'DuckDBDateValue' && 'days' in valueObj) {
// Convert days since epoch to ISO string
const days = Number(valueObj.days);
const millis = days * 24 * 60 * 60 * 1000;
return new Date(millis).toISOString();
}
}

if (constructorName === 'DuckDBDecimalValue' && 'value' in valueObj && 'scale' in valueObj) {
// Convert decimal to string with proper formatting
const bigintValue = BigInt(valueObj.value);
const scale = Number(valueObj.scale);

if (scale === 0) {
return bigintValue.toString();
}

// Format decimal value with the fractional part
const valueStr = bigintValue.toString();
const isNegative = valueStr[0] === '-';
const absValueStr = isNegative ? valueStr.slice(1) : valueStr;
const paddedValue = absValueStr.padStart(scale + 1, '0');
const integerPart = paddedValue.slice(0, -scale) || '0';
const fractionalPart = paddedValue.slice(-scale);
const formattedValue = `${isNegative ? '-' : ''}${integerPart}.${fractionalPart}`;
// Remove trailing zeros and decimal point if no fractional part remains
return formattedValue.replace(/\.?0+$/, '') || '0';
}

// Fallback: convert to string
return String(value);
};