diff --git a/src/webStreams/Logger.ts b/src/webStreams/Logger.ts index 37e5f83..d60c0a2 100644 --- a/src/webStreams/Logger.ts +++ b/src/webStreams/Logger.ts @@ -1,45 +1,41 @@ -export enum LogLevel { - ERROR = 0, - WARN = 1, - INFO = 2, - DEBUG = 3, -} - +/** + * Minimal timestamp-prefixed logger used internally by the webStreams layer. + * + * Log levels (lower number = higher severity): + * - 0 ERROR + * - 1 WARN (default threshold) + * - 2 INFO + * - 3 DEBUG + * + * Only messages whose level is ≤ the configured threshold are emitted. + */ export class Logger { - private level: LogLevel; + private level: number; - constructor(level: LogLevel = LogLevel.WARN) { + constructor(level: number = 1) { this.level = level; } - setLevel(level: LogLevel) { - this.level = level; + debug(...args: unknown[]): void { + this.log(3, "DEBUG", args); } - getLevel(): LogLevel { - return this.level; + info(...args: unknown[]): void { + this.log(2, "INFO", args); } - private log(level: LogLevel, prefix: string, args: any[]) { - if (level <= this.level) { - const timestamp = new Date().toISOString(); - console.log(`[${timestamp}] [${prefix}]`, ...args); - } + warn(...args: unknown[]): void { + this.log(1, "WARN", args); } - debug(...args: any[]) { - this.log(LogLevel.DEBUG, "DEBUG", args); + error(...args: unknown[]): void { + this.log(0, "ERROR", args); } - info(...args: any[]) { - this.log(LogLevel.INFO, "INFO", args); - } - - warn(...args: any[]) { - this.log(LogLevel.WARN, "WARN", args); - } - - error(...args: any[]) { - this.log(LogLevel.ERROR, "ERROR", args); + private log(level: number, prefix: string, args: unknown[]): void { + if (level <= this.level) { + const timestamp = new Date().toISOString(); + console.log(`[${timestamp}] [${prefix}]`, ...args); + } } } diff --git a/src/webStreams/Queue.ts b/src/webStreams/Queue.ts index 7a375f1..32dea07 100644 --- a/src/webStreams/Queue.ts +++ b/src/webStreams/Queue.ts @@ -1,74 +1,62 @@ -export interface QueueTask { - func(item: T): Promise; -} - -export class Queue implements Iterable { +/** + * Simple serial async queue. + * + * Items are enqueued synchronously and processed one at a time by the function + * registered via `assignProcessorFunc`. Calling `processAll` while a drain is + * already in progress returns the same in-flight promise, preventing concurrent + * drains. + */ +export class Queue { private items: T[] = []; private func: ((item: T) => Promise) | undefined; - private processing: boolean = false; + private drainPromise: Promise | undefined; + /** Appends `item` to the back of the queue. */ enqueue(item: T): void { this.items.push(item); } - dequeue(): T | undefined { - return this.items.shift(); - } - - peek(): T | undefined { - return this.items[0]; - } - - get size(): number { - return this.items.length; - } - - isEmpty(): boolean { - return this.items.length === 0; - } - - clear(): void { - this.items.length = 0; - } - - toArray(): T[] { - return this.items.slice(); - } - - assignProcessorFunc(func: (item: T) => Promise) { + /** + * Registers the async function used to process each item. + * Must be called before `processAll`. + */ + assignProcessorFunc(func: (item: T) => Promise): void { this.func = func; } - async processAll() { - if (this.processing) { - return; + /** + * Processes all currently queued items serially, then resolves. + * If called while a drain is already running, joins the existing drain + * promise rather than starting a new one. + * @throws if no processor function has been assigned. + */ + async processAll(): Promise { + if (this.drainPromise) { + return this.drainPromise; } if (!this.func) { throw new Error("No task processor function assigned"); } - this.processing = true; + this.drainPromise = this.drain(this.func); + try { + await this.drainPromise; + } finally { + this.drainPromise = undefined; + } + } + + private async drain(func: (item: T) => Promise): Promise { + let itemIndex = 0; while (this.items.length > 0) { const item = this.items.shift(); if (!item) continue; - const randId = Math.random(); + const itemId = itemIndex++; try { - await this.func(item); + await func(item); } catch (err) { - console.error("Error while processing queue item", randId, err); + console.error("Error while processing queue item", itemId, err); + throw err; } } - - this.processing = false; - } - - [Symbol.iterator](): Iterator { - let idx = 0; - const arr = this.items; - return { - next(): IteratorResult { - if (idx < arr.length) return { value: arr[idx++], done: false }; - return { value: undefined as any, done: true }; - }, - }; } } diff --git a/src/webStreams/types/WebRtcExtensions.ts b/src/webStreams/types/WebRtcExtensions.ts new file mode 100644 index 0000000..dbb3888 --- /dev/null +++ b/src/webStreams/types/WebRtcExtensions.ts @@ -0,0 +1,38 @@ +/** + * Type augmentations for non-standard / draft WebRTC APIs that are present in + * modern browsers but absent from the TypeScript lib.dom.d.ts declarations. + */ + +export interface EncodedStreams { + readable: ReadableStream; + writable: WritableStream; +} + +export interface RTCRtpScriptTransformOptions { + operation: "encode" | "decode"; + id?: string; + publisherId?: number; + kind?: string; +} + +export interface RTCRtpSenderWithTransform extends RTCRtpSender { + transform: unknown; + createEncodedStreams(): EncodedStreams; +} + +export interface RTCRtpReceiverWithTransform extends RTCRtpReceiver { + transform: unknown; + createEncodedStreams(): EncodedStreams; +} + +export interface RTCConfigurationWithInsertableStreams extends RTCConfiguration { + encodedInsertableStreams: boolean; +} + +export interface WindowWithRTCRtpScriptTransform extends Window { + RTCRtpScriptTransform: new (worker: Worker, options: RTCRtpScriptTransformOptions) => unknown; +} + +export interface WindowWithWasmHandler extends Window { + webRtcInterfaceToNativeHandler: Record; +}