-
-
Notifications
You must be signed in to change notification settings - Fork 105
feat: Devtools - A separate, lightweight event bus -- utilizing Dispatch (Sync) #950
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
859468e
1cfd7ef
5fe505c
8dcba1c
0f8cec0
8a7b56e
3cd95e0
be0b792
353fcaa
a5990bb
d37a487
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -57,7 +57,7 @@ | |
| await coordinator.exec(`${input.value}`); | ||
| output.replaceChildren(); | ||
| } else { | ||
| await coordinator().exec( | ||
| await vg.coordinator().exec( | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we need to add
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. honestly, I'm not really sure. this example was broken when I tested it; the coordinator is not accessible as an object here, but only as a property of
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ahh, good catch. I'll fix it separately. Let's keep PRs focused on one thing.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done in #965. Let's remove this here. |
||
| `DROP TABLE IF EXISTS ${tableName}; | ||
| CREATE TABLE ${tableName} AS ${input.value}` | ||
| ); | ||
|
|
@@ -66,6 +66,21 @@ | |
| } | ||
| enable(); | ||
| }); | ||
|
|
||
| function setQueryLog() { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't this logic (which replicates the old logger, right?) be in mosaic core so that people can easily get the old logger behavior again?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ahh. we can definitely do that. My mistake, I felt that when we said 'remove the logger from core entirely,' we give the client the option to write that logic. I can put this function in core to give back a logger util to the user/client (perhaps we can discuss that idea some more (?))
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IIRC the idea was to remove the logger but give users an equivalent tool via the new bus. |
||
| const bus = vg.coordinator().eventBus; | ||
|
|
||
| bus.observe("query-start", (event) => | ||
| console.info("Query started:", event), | ||
| ); | ||
| bus.observe("query-end", (event) => console.info("Query ended:", event)); | ||
| bus.observe("client-connect", (event) => | ||
| console.info("Client connected:", event), | ||
| ); | ||
| bus.observe("error", (event) => console.error("Error:", event)); | ||
| } | ||
|
|
||
| setQueryLog() | ||
| </script> | ||
| </body> | ||
| </html> | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,43 @@ | ||
| export enum EventType { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of having this enum here. Wouldn't it be easier to have classes for the events that can be instantiated with a constructor?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That is probably true. It will get rid of this "broken"-ish type interface for the events. But I remember that we agreed on enums (denoted by strings instead of numbers) to provide the user a simple list of event types in autocomplete
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, we can still use enums in the event types but in order to create an event object, we could use a class. Maybe it's cleaner to actually put the event type into the message so that we can easily type check a message in isolation. Let's do that. |
||
| QueryStart = "query-start", | ||
| QueryEnd = "query-end", | ||
| ClientConnect = "client-connect", | ||
| ClientStateChange = "client-state-change", | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this still be here?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sorry? I don't understand
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see |
||
| Error = "error", | ||
| } | ||
|
|
||
| export type EventMap = { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've never seen this pattern before. Why is this needed?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was mainly for type-safety (for us and the clients)
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's not do this and instead use the class approach I suggested? |
||
| [EventType.QueryStart]: Omit<QueryStartEvent, keyof MosaicEvent>; | ||
| [EventType.QueryEnd]: Omit<QueryEndEvent, keyof MosaicEvent>; | ||
| [EventType.ClientConnect]: Omit<ClientConnectEvent, keyof MosaicEvent>; | ||
| [EventType.Error]: Omit<ErrorEvent, keyof MosaicEvent>; | ||
| }; | ||
|
|
||
| export interface MosaicEvent { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be private/not exported?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I feel like we'll need this type when omitting pre-set keys in the coordinator/query-manager |
||
| timestamp: number; | ||
| // Extend later with more fields | ||
domoritz marked this conversation as resolved.
Show resolved
Hide resolved
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. rm comment |
||
| } | ||
|
|
||
| export interface QueryStartEvent extends MosaicEvent { | ||
| query: string; | ||
| materialized: boolean; | ||
| } | ||
|
|
||
| export interface QueryEndEvent extends MosaicEvent { | ||
| query: string; | ||
| materialized: boolean; | ||
| } | ||
|
|
||
| export interface ClientConnectEvent extends MosaicEvent { | ||
| clientId?: string; | ||
| } | ||
|
|
||
| export interface ErrorEvent extends MosaicEvent { | ||
| message: string; | ||
| } | ||
|
|
||
| export type MosaicEvents = | ||
| | QueryStartEvent | ||
| | QueryEndEvent | ||
| | ClientConnectEvent | ||
| | ErrorEvent; | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,35 +1,34 @@ | ||
| import type { Connector } from './connectors/Connector.js'; | ||
| import type { Cache, Logger, QueryEntry, QueryRequest } from './types.js'; | ||
| import type { Cache, QueryEntry, QueryRequest } from './types.js'; | ||
| import { consolidator } from './QueryConsolidator.js'; | ||
| import { lruCache, voidCache } from './util/cache.js'; | ||
| import { PriorityQueue } from './util/priority-queue.js'; | ||
| import { QueryResult, QueryState } from './util/query-result.js'; | ||
| import { voidLogger } from './util/void-logger.js'; | ||
| import { EventType, MosaicEvent, MosaicEvents } from './Events.js'; | ||
| import { ObserveDispatch } from './util/ObserveDispatch.js'; | ||
|
|
||
| export const Priority = Object.freeze({ High: 0, Normal: 1, Low: 2 }); | ||
|
|
||
| export class QueryManager { | ||
| private queue: PriorityQueue<QueryEntry>; | ||
| private db: Connector | null; | ||
| private clientCache: Cache | null; | ||
| private _logger: Logger; | ||
| private _logQueries: boolean; | ||
| private _consolidate: ReturnType<typeof consolidator> | null; | ||
| /** Requests pending with the query manager. */ | ||
| public pendingResults: QueryResult[]; | ||
| private maxConcurrentRequests: number; | ||
| private pendingExec: boolean; | ||
| public eventBus?; | ||
|
|
||
| constructor(maxConcurrentRequests: number = 32) { | ||
| constructor(maxConcurrentRequests: number = 32, eventBus?: ObserveDispatch<Omit<MosaicEvents, keyof MosaicEvent>>) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we need the omit?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I chose this approach because we dont pass timestamp here at compile time but we need to provide the user with the timestamp type. Open to hear ideas that can do this better! |
||
| this.queue = new PriorityQueue(3); | ||
| this.db = null; | ||
| this.clientCache = null; | ||
| this._logger = voidLogger(); | ||
| this._logQueries = false; | ||
| this._consolidate = null; | ||
| this.pendingResults = []; | ||
| this.maxConcurrentRequests = maxConcurrentRequests; | ||
| this.pendingExec = false; | ||
| this.eventBus = eventBus; | ||
| } | ||
|
|
||
| next(): void { | ||
|
|
@@ -52,7 +51,7 @@ export class QueryManager { | |
| if (result.state === QueryState.ready) { | ||
| result.fulfill(); | ||
| } else if (result.state === QueryState.done) { | ||
| this._logger.warn('Found resolved query in pending results.'); | ||
| this.eventBus?.emit(EventType.Error, { message: 'Found resolved query in pending results.' }); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't this be a warning?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yep, adding a warning event type now... (perhaps we need more for other logger-related types -- warning, info, etc) |
||
| } | ||
| } | ||
| if (request.type === 'exec') this.pendingExec = false; | ||
|
|
@@ -80,22 +79,27 @@ export class QueryManager { | |
| const { query, type, cache = false, options } = request; | ||
| const sql = query ? `${query}` : null; | ||
|
|
||
| this.eventBus?.emit(EventType.QueryStart, { | ||
| query: sql || '', | ||
| materialized: cache, | ||
| }); | ||
|
|
||
| // check query cache | ||
| if (cache) { | ||
| const cached = this.clientCache!.get(sql!); | ||
| if (cached) { | ||
| const data = await cached; | ||
| this._logger.debug('Cache'); | ||
| const data = cached; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we not need to await anymore?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. await will have no effect here
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's make a separate pull request. IIRC there was some reason where cached results can be promises but I am not 100% sure right now. |
||
| result.ready(data); | ||
| this.eventBus?.emit(EventType.QueryEnd, { | ||
| query: sql || '', | ||
| materialized: cache, | ||
| }); | ||
| return; | ||
| } | ||
| } | ||
|
|
||
| // issue query, potentially cache result | ||
| const t0 = performance.now(); | ||
| if (this._logQueries) { | ||
| this._logger.debug('Query', { type, sql, ...options }); | ||
| } | ||
|
|
||
| // @ts-expect-error type may be exec | json | arrow | ||
| const promise = this.db!.query({ type, sql: sql!, ...options }); | ||
|
|
@@ -105,9 +109,16 @@ export class QueryManager { | |
|
|
||
| if (cache) this.clientCache!.set(sql!, data); | ||
|
|
||
| this._logger.debug(`Request: ${(performance.now() - t0).toFixed(1)}`); | ||
| result.ready(type === 'exec' ? null : data); | ||
|
|
||
| this.eventBus?.emit(EventType.QueryEnd, { | ||
| query: sql || '', | ||
| materialized: cache, | ||
| }); | ||
| } catch (err) { | ||
| this.eventBus?.emit(EventType.Error, { | ||
| message: err, | ||
| }); | ||
|
Comment on lines
+119
to
+121
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this strictly needed to replicate the behavior of the old logger?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, all it's doing it just pipe whatever the message was (which logger emitted) into the bus, which is now consumable by anyone, including the logger. I don't know if Im missing something, did you have a simpler approach in mind?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just didn't see a log message (we removed) here before so I am wondering whether the emit is needed. |
||
| result.reject(err); | ||
| } | ||
| } | ||
|
|
@@ -125,28 +136,6 @@ export class QueryManager { | |
| : this.clientCache; | ||
| } | ||
|
|
||
| /** | ||
| * Get or set the current logger. | ||
| * @param value Logger to set | ||
| * @returns Current logger | ||
| */ | ||
| logger(): Logger; | ||
| logger(value: Logger): Logger; | ||
| logger(value?: Logger): Logger { | ||
| return value ? (this._logger = value) : this._logger; | ||
| } | ||
|
|
||
| /** | ||
| * Get or set if queries should be logged. | ||
| * @param value Whether to log queries | ||
| * @returns Current logging state | ||
| */ | ||
| logQueries(): boolean; | ||
| logQueries(value: boolean): boolean; | ||
| logQueries(value?: boolean): boolean { | ||
| return value !== undefined ? this._logQueries = !!value : this._logQueries; | ||
| } | ||
|
|
||
| /** | ||
| * Get or set the database connector. | ||
| * @param connector Connector to set | ||
|
|
@@ -217,4 +206,4 @@ export class QueryManager { | |
| } | ||
| this.pendingResults = []; | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
duplicated logic?