Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 26 additions & 30 deletions src/webStreams/Logger.ts
Original file line number Diff line number Diff line change
@@ -1,45 +1,41 @@
export enum LogLevel {
ERROR = 0,
WARN = 1,
INFO = 2,
DEBUG = 3,
}

/**
* Minimal timestamp-prefixed logger used internally by the webStreams layer.
*
* Log levels (lower number = higher severity):
* - 0 ERROR
* - 1 WARN (default threshold)
* - 2 INFO
* - 3 DEBUG
*
* Only messages whose level is ≤ the configured threshold are emitted.
*/
export class Logger {
private level: LogLevel;
private level: number;

constructor(level: LogLevel = LogLevel.WARN) {
constructor(level: number = 1) {
this.level = level;
}

setLevel(level: LogLevel) {
this.level = level;
debug(...args: unknown[]): void {
this.log(3, "DEBUG", args);
}

getLevel(): LogLevel {
return this.level;
info(...args: unknown[]): void {
this.log(2, "INFO", args);
}

private log(level: LogLevel, prefix: string, args: any[]) {
if (level <= this.level) {
const timestamp = new Date().toISOString();
console.log(`[${timestamp}] [${prefix}]`, ...args);
}
warn(...args: unknown[]): void {
this.log(1, "WARN", args);
}

debug(...args: any[]) {
this.log(LogLevel.DEBUG, "DEBUG", args);
error(...args: unknown[]): void {
this.log(0, "ERROR", args);
}

info(...args: any[]) {
this.log(LogLevel.INFO, "INFO", args);
}

warn(...args: any[]) {
this.log(LogLevel.WARN, "WARN", args);
}

error(...args: any[]) {
this.log(LogLevel.ERROR, "ERROR", args);
private log(level: number, prefix: string, args: unknown[]): void {
if (level <= this.level) {
const timestamp = new Date().toISOString();
console.log(`[${timestamp}] [${prefix}]`, ...args);
}
}
}
90 changes: 39 additions & 51 deletions src/webStreams/Queue.ts
Original file line number Diff line number Diff line change
@@ -1,74 +1,62 @@
export interface QueueTask<T> {
func(item: T): Promise<void>;
}

export class Queue<T> implements Iterable<T> {
/**
* Simple serial async queue.
*
* Items are enqueued synchronously and processed one at a time by the function
* registered via `assignProcessorFunc`. Calling `processAll` while a drain is
* already in progress returns the same in-flight promise, preventing concurrent
* drains.
*/
export class Queue<T> {
private items: T[] = [];
private func: ((item: T) => Promise<void>) | undefined;
private processing: boolean = false;
private drainPromise: Promise<void> | undefined;

/** Appends `item` to the back of the queue. */
enqueue(item: T): void {
this.items.push(item);
}

dequeue(): T | undefined {
return this.items.shift();
}

peek(): T | undefined {
return this.items[0];
}

get size(): number {
return this.items.length;
}

isEmpty(): boolean {
return this.items.length === 0;
}

clear(): void {
this.items.length = 0;
}

toArray(): T[] {
return this.items.slice();
}

assignProcessorFunc(func: (item: T) => Promise<void>) {
/**
* Registers the async function used to process each item.
* Must be called before `processAll`.
*/
assignProcessorFunc(func: (item: T) => Promise<void>): void {
this.func = func;
}

async processAll() {
if (this.processing) {
return;
/**
* Processes all currently queued items serially, then resolves.
* If called while a drain is already running, joins the existing drain
* promise rather than starting a new one.
* @throws if no processor function has been assigned.
*/
async processAll(): Promise<void> {
if (this.drainPromise) {
return this.drainPromise;
}
if (!this.func) {
throw new Error("No task processor function assigned");
}
this.processing = true;
this.drainPromise = this.drain(this.func);
try {
await this.drainPromise;
} finally {
this.drainPromise = undefined;
}
}

private async drain(func: (item: T) => Promise<void>): Promise<void> {
let itemIndex = 0;
while (this.items.length > 0) {
const item = this.items.shift();
if (!item) continue;
const randId = Math.random();
const itemId = itemIndex++;
try {
await this.func(item);
await func(item);
} catch (err) {
console.error("Error while processing queue item", randId, err);
console.error("Error while processing queue item", itemId, err);
throw err;
}
}

this.processing = false;
}

[Symbol.iterator](): Iterator<T> {
let idx = 0;
const arr = this.items;
return {
next(): IteratorResult<T> {
if (idx < arr.length) return { value: arr[idx++], done: false };
return { value: undefined as any, done: true };
},
};
}
}
38 changes: 38 additions & 0 deletions src/webStreams/types/WebRtcExtensions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/**
* Type augmentations for non-standard / draft WebRTC APIs that are present in
* modern browsers but absent from the TypeScript lib.dom.d.ts declarations.
*/

export interface EncodedStreams {
readable: ReadableStream<EncodedAudioChunk | EncodedVideoChunk>;
writable: WritableStream<EncodedAudioChunk | EncodedVideoChunk>;
}

export interface RTCRtpScriptTransformOptions {
operation: "encode" | "decode";
id?: string;
publisherId?: number;
kind?: string;
}

export interface RTCRtpSenderWithTransform extends RTCRtpSender {
transform: unknown;
createEncodedStreams(): EncodedStreams;
}

export interface RTCRtpReceiverWithTransform extends RTCRtpReceiver {
transform: unknown;
createEncodedStreams(): EncodedStreams;
}

export interface RTCConfigurationWithInsertableStreams extends RTCConfiguration {
encodedInsertableStreams: boolean;
}

export interface WindowWithRTCRtpScriptTransform extends Window {
RTCRtpScriptTransform: new (worker: Worker, options: RTCRtpScriptTransformOptions) => unknown;
}

export interface WindowWithWasmHandler extends Window {
webRtcInterfaceToNativeHandler: Record<number, unknown>;
}