Skip to content
11 changes: 10 additions & 1 deletion dev/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,16 @@
}

function setQueryLog() {
vg.coordinator().manager.logQueries(qlogToggle.checked);
const bus = vg.coordinator().eventBus;

bus.observe("QueryStart", (event) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

duplicated logic?

console.info("Query started:", event),
);
bus.observe("QueryEnd", (event) => console.info("Query ended:", event));
bus.observe("ClientConnect", (event) =>
console.info("Client connected:", event),
);
bus.observe("Error", (event) => console.error("Error:", event));
}

function setCache() {
Expand Down
17 changes: 16 additions & 1 deletion dev/query/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
await coordinator.exec(`${input.value}`);
output.replaceChildren();
} else {
await coordinator().exec(
await vg.coordinator().exec(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to add vg.?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

honestly, I'm not really sure. this example was broken when I tested it; the coordinator is not accessible as an object here, but only as a property of vg

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, good catch. I'll fix it separately. Let's keep PRs focused on one thing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in #965. Let's remove this here.

`DROP TABLE IF EXISTS ${tableName};
CREATE TABLE ${tableName} AS ${input.value}`
);
Expand All @@ -66,6 +66,21 @@
}
enable();
});

function setQueryLog() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this logic (which replicates the old logger, right?) be in mosaic core so that people can easily get the old logger behavior again?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh. we can definitely do that. My mistake, I felt that when we said 'remove the logger from core entirely,' we give the client the option to write that logic. I can put this function in core to give back a logger util to the user/client (perhaps we can discuss that idea some more (?))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC the idea was to remove the logger but give users an equivalent tool via the new bus.

const bus = vg.coordinator().eventBus;

bus.observe("query-start", (event) =>
console.info("Query started:", event),
);
bus.observe("query-end", (event) => console.info("Query ended:", event));
bus.observe("client-connect", (event) =>
console.info("Client connected:", event),
);
bus.observe("error", (event) => console.error("Error:", event));
}

setQueryLog()
</script>
</body>
</html>
34 changes: 10 additions & 24 deletions packages/mosaic/core/src/Coordinator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@
import { SocketConnector } from './connectors/socket.js';
import { type Connector } from './connectors/Connector.js';
import { PreAggregator, type PreAggregateOptions } from './preagg/PreAggregator.js';
import { voidLogger } from './util/void-logger.js';
import { QueryManager, Priority } from './QueryManager.js';
import { type Selection } from './Selection.js';
import { type Logger, type QueryType } from './types.js';
import { type QueryType } from './types.js';
import { type QueryResult } from './util/query-result.js';
import { type MosaicClient } from './MosaicClient.js';
import { type SelectionClause } from './SelectionClause.js';
import { MaybeArray } from '@uwdata/mosaic-sql';
import { Table } from '@uwdata/flechette';
import { EventType, MosaicEvent, MosaicEvents } from './Events.js';
import { ObserveDispatch } from './util/ObserveDispatch.js';

interface FilterGroupEntry {
selection: Selection;
Expand Down Expand Up @@ -49,12 +50,11 @@ export class Coordinator {
public preaggregator: PreAggregator;
public clients = new Set<MosaicClient>;
public filterGroups = new Map<Selection, FilterGroupEntry>;
protected _logger: Logger = voidLogger();
public eventBus: ObserveDispatch<Omit<MosaicEvents, keyof MosaicEvent>>;

/**
* @param db Database connector. Defaults to a web socket connection.
* @param options Coordinator options.
* @param options.logger The logger to use, defaults to `console`.
* @param options.manager The query manager to use.
* @param options.cache Boolean flag to enable/disable query caching.
* @param options.consolidate Boolean flag to enable/disable query consolidation.
Expand All @@ -63,25 +63,24 @@ export class Coordinator {
constructor(
db: Connector = new SocketConnector(),
options: {
logger?: Logger | null;
manager?: QueryManager;
cache?: boolean;
consolidate?: boolean;
preagg?: PreAggregateOptions;
} = {}
) {
const {
logger = console,
manager = new QueryManager(),
cache = true,
consolidate = true,
preagg = {}
} = options;
this.eventBus = new ObserveDispatch();
this.manager = manager;
this.manager.eventBus = this.eventBus;
this.manager.cache(cache);
this.manager.consolidate(consolidate);
this.databaseConnector(db);
this.logger(logger);
this.clear();
this.preaggregator = new PreAggregator(this, preagg);
}
Expand Down Expand Up @@ -116,20 +115,7 @@ export class Coordinator {
? this.manager.connector(db)
: this.manager.connector();
}

/**
* Get or set the logger.
* @param logger The logger to use.
* @returns The current logger
*/
logger(logger?: Logger | null): Logger {
if (arguments.length) {
this._logger = logger || voidLogger();
this.manager.logger(this._logger);
}
return this._logger!;
}


// -- Query Management ----

/**
Expand Down Expand Up @@ -249,9 +235,9 @@ export class Coordinator {
return client._pending = this.query(query, { priority })
.then(
data => client.queryResult(data).update(),
err => { this._logger?.error(err); client.queryError(err); }
err => { this.eventBus.emit(EventType.Error, { message: err }); client.queryError(err); }
)
.catch(err => this._logger?.error(err));
.catch(err => { this.eventBus.emit(EventType.Error, { message: err }); });
}

/**
Expand Down Expand Up @@ -394,4 +380,4 @@ function updateSelection(
const query = info?.query(active.predicate) ?? client.query(filter);
return mc.updateClient(client, query);
}));
}
}
43 changes: 43 additions & 0 deletions packages/mosaic/core/src/Events.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
export enum EventType {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of having this enum here. Wouldn't it be easier to have classes for the events that can be instantiated with a constructor?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is probably true. It will get rid of this "broken"-ish type interface for the events. But I remember that we agreed on enums (denoted by strings instead of numbers) to provide the user a simple list of event types in autocomplete

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we can still use enums in the event types but in order to create an event object, we could use a class. Maybe it's cleaner to actually put the event type into the message so that we can easily type check a message in isolation. Let's do that.

QueryStart = "query-start",
QueryEnd = "query-end",
ClientConnect = "client-connect",
ClientStateChange = "client-state-change",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this still be here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry? I don't understand

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see client-state-change used anywhere.

Error = "error",
}

export type EventMap = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've never seen this pattern before. Why is this needed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was mainly for type-safety (for us and the clients)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not do this and instead use the class approach I suggested?

[EventType.QueryStart]: Omit<QueryStartEvent, keyof MosaicEvent>;
[EventType.QueryEnd]: Omit<QueryEndEvent, keyof MosaicEvent>;
[EventType.ClientConnect]: Omit<ClientConnectEvent, keyof MosaicEvent>;
[EventType.Error]: Omit<ErrorEvent, keyof MosaicEvent>;
};

export interface MosaicEvent {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be private/not exported?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like we'll need this type when omitting pre-set keys in the coordinator/query-manager

timestamp: number;
// Extend later with more fields
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rm comment

}

export interface QueryStartEvent extends MosaicEvent {
query: string;
materialized: boolean;
}

export interface QueryEndEvent extends MosaicEvent {
query: string;
materialized: boolean;
}

export interface ClientConnectEvent extends MosaicEvent {
clientId?: string;
}

export interface ErrorEvent extends MosaicEvent {
message: string;
}

export type MosaicEvents =
| QueryStartEvent
| QueryEndEvent
| ClientConnectEvent
| ErrorEvent;
63 changes: 26 additions & 37 deletions packages/mosaic/core/src/QueryManager.ts
Original file line number Diff line number Diff line change
@@ -1,35 +1,34 @@
import type { Connector } from './connectors/Connector.js';
import type { Cache, Logger, QueryEntry, QueryRequest } from './types.js';
import type { Cache, QueryEntry, QueryRequest } from './types.js';
import { consolidator } from './QueryConsolidator.js';
import { lruCache, voidCache } from './util/cache.js';
import { PriorityQueue } from './util/priority-queue.js';
import { QueryResult, QueryState } from './util/query-result.js';
import { voidLogger } from './util/void-logger.js';
import { EventType, MosaicEvent, MosaicEvents } from './Events.js';
import { ObserveDispatch } from './util/ObserveDispatch.js';

export const Priority = Object.freeze({ High: 0, Normal: 1, Low: 2 });

export class QueryManager {
private queue: PriorityQueue<QueryEntry>;
private db: Connector | null;
private clientCache: Cache | null;
private _logger: Logger;
private _logQueries: boolean;
private _consolidate: ReturnType<typeof consolidator> | null;
/** Requests pending with the query manager. */
public pendingResults: QueryResult[];
private maxConcurrentRequests: number;
private pendingExec: boolean;
public eventBus?;

constructor(maxConcurrentRequests: number = 32) {
constructor(maxConcurrentRequests: number = 32, eventBus?: ObserveDispatch<Omit<MosaicEvents, keyof MosaicEvent>>) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need the omit?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chose this approach because we dont pass timestamp here at compile time but we need to provide the user with the timestamp type. Open to hear ideas that can do this better!

this.queue = new PriorityQueue(3);
this.db = null;
this.clientCache = null;
this._logger = voidLogger();
this._logQueries = false;
this._consolidate = null;
this.pendingResults = [];
this.maxConcurrentRequests = maxConcurrentRequests;
this.pendingExec = false;
this.eventBus = eventBus;
}

next(): void {
Expand All @@ -52,7 +51,7 @@ export class QueryManager {
if (result.state === QueryState.ready) {
result.fulfill();
} else if (result.state === QueryState.done) {
this._logger.warn('Found resolved query in pending results.');
this.eventBus?.emit(EventType.Error, { message: 'Found resolved query in pending results.' });
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be a warning?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, adding a warning event type now... (perhaps we need more for other logger-related types -- warning, info, etc)

}
}
if (request.type === 'exec') this.pendingExec = false;
Expand Down Expand Up @@ -80,22 +79,27 @@ export class QueryManager {
const { query, type, cache = false, options } = request;
const sql = query ? `${query}` : null;

this.eventBus?.emit(EventType.QueryStart, {
query: sql || '',
materialized: cache,
});

// check query cache
if (cache) {
const cached = this.clientCache!.get(sql!);
if (cached) {
const data = await cached;
this._logger.debug('Cache');
const data = cached;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we not need to await anymore?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

await will have no effect here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make a separate pull request. IIRC there was some reason where cached results can be promises but I am not 100% sure right now.

result.ready(data);
this.eventBus?.emit(EventType.QueryEnd, {
query: sql || '',
materialized: cache,
});
return;
}
}

// issue query, potentially cache result
const t0 = performance.now();
if (this._logQueries) {
this._logger.debug('Query', { type, sql, ...options });
}

// @ts-expect-error type may be exec | json | arrow
const promise = this.db!.query({ type, sql: sql!, ...options });
Expand All @@ -105,9 +109,16 @@ export class QueryManager {

if (cache) this.clientCache!.set(sql!, data);

this._logger.debug(`Request: ${(performance.now() - t0).toFixed(1)}`);
result.ready(type === 'exec' ? null : data);

this.eventBus?.emit(EventType.QueryEnd, {
query: sql || '',
materialized: cache,
});
} catch (err) {
this.eventBus?.emit(EventType.Error, {
message: err,
});
Comment on lines +119 to +121
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this strictly needed to replicate the behavior of the old logger?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, all it's doing it just pipe whatever the message was (which logger emitted) into the bus, which is now consumable by anyone, including the logger. I don't know if Im missing something, did you have a simpler approach in mind?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just didn't see a log message (we removed) here before so I am wondering whether the emit is needed.

result.reject(err);
}
}
Expand All @@ -125,28 +136,6 @@ export class QueryManager {
: this.clientCache;
}

/**
* Get or set the current logger.
* @param value Logger to set
* @returns Current logger
*/
logger(): Logger;
logger(value: Logger): Logger;
logger(value?: Logger): Logger {
return value ? (this._logger = value) : this._logger;
}

/**
* Get or set if queries should be logged.
* @param value Whether to log queries
* @returns Current logging state
*/
logQueries(): boolean;
logQueries(value: boolean): boolean;
logQueries(value?: boolean): boolean {
return value !== undefined ? this._logQueries = !!value : this._logQueries;
}

/**
* Get or set the database connector.
* @param connector Connector to set
Expand Down Expand Up @@ -217,4 +206,4 @@ export class QueryManager {
}
this.pendingResults = [];
}
}
}
3 changes: 2 additions & 1 deletion packages/mosaic/core/src/preagg/PreAggregator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import type { BinMethod, ClauseSource, IntervalMetadata, SelectionClause } from
import { Query as QueryBuilder, and, asNode, ceil, collectColumns, createTable, float64, floor, isBetween, int32, mul, round, scaleTransform, sub, isSelectQuery, isAggregateExpression, ColumnNameRefNode } from '@uwdata/mosaic-sql';
import { preaggColumns, PreAggColumnsResult } from './preagg-columns.js';
import { fnv_hash } from '../util/hash.js';
import { EventType } from '../Events.js';

const Skip = { skip: true, result: null };

Expand Down Expand Up @@ -216,7 +217,7 @@ export class PreAggregator {
`CREATE SCHEMA IF NOT EXISTS ${schema}`,
createTable(info.table, info.create, { temp: false })
]);
info.result.catch((e: Error) => mc.logger().error(e));
info.result.catch((e: Error) => mc.eventBus.emit(EventType.Error, {message: e}));
}

entries.set(client, info);
Expand Down
Loading
Loading