Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions jobs/cancel_orders.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ import { HasTelegram } from '../bot/start';
import { User, Order } from '../models';
import { cancelShowHoldInvoice, cancelAddInvoice } from '../bot/commands';
import * as messages from '../bot/messages';
import { getUserI18nContext, holdInvoiceExpirationInSecs } from '../util';
import {
getUserI18nContext,
holdInvoiceExpirationInSecs,
PerOrderIdMutex,
} from '../util';
import { logger } from '../logger';
import { CommunityContext } from '../bot/modules/community/communityContext';
import * as OrderEvents from '../bot/modules/events/orders';
import { PerOrderIdMutex } from '../ln/subscribe_invoice';

const cancelOrders = async (bot: HasTelegram) => {
try {
Expand Down
7 changes: 5 additions & 2 deletions jobs/check_hold_invoice_expired.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import { HasTelegram } from '../bot/start';
import { Order, User, Dispute } from '../models';
import { holdInvoiceExpirationInSecs, getUserI18nContext } from '../util';
import {
holdInvoiceExpirationInSecs,
getUserI18nContext,
PerOrderIdMutex,
} from '../util';
import { logger } from '../logger';
import { cancelHoldInvoice } from '../ln';
import { PerOrderIdMutex } from '../ln/subscribe_invoice';
import * as OrderEvents from '../bot/modules/events/orders';
import * as messages from '../bot/messages';

Expand Down
42 changes: 7 additions & 35 deletions ln/subscribe_invoice.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,48 +6,20 @@ import { getInfo } from './info';
import lnd from './connect';
import * as messages from '../bot/messages';
import * as ordersActions from '../bot/ordersActions';
import { getUserI18nContext, getEmojiRate, decimalRound } from '../util';
import {
getUserI18nContext,
getEmojiRate,
decimalRound,
PerOrderIdMutex,
} from '../util';
import { logger } from '../logger';
import { HasTelegram } from '../bot/start';
import { IOrder } from '../models/order';
import { Mutex } from 'async-mutex';

type LockCountedMutex = {
lockCount: number;
mutex: Mutex;
};

// Track pending reconnects to prevent duplicate resubscriptions
// when both 'error' and 'end' events fire for the same invoice
const pendingReconnects: Set<string> = new Set();

class PerOrderIdMutex {
mutexes: Map<string, LockCountedMutex> = new Map();

async runExclusive(orderId: string, callback: () => Promise<any>) {
let mtx: LockCountedMutex;
if (!this.mutexes.has(orderId)) {
mtx = { lockCount: 1, mutex: new Mutex() };
this.mutexes.set(orderId, mtx);
} else {
mtx = this.mutexes.get(orderId)!;
mtx.lockCount++;
}
let ret: any;
try {
ret = await mtx.mutex.runExclusive(callback);
} finally {
mtx.lockCount--;
if (mtx.lockCount == 0) {
this.mutexes.delete(orderId);
}
}
return ret;
}

static instance = new PerOrderIdMutex();
}

const subscribeInvoice = async (
bot: HasTelegram,
id: string,
Expand Down Expand Up @@ -307,4 +279,4 @@ const payHoldInvoice = async (bot: HasTelegram, order: IOrder) => {
}
};

export { subscribeInvoice, payHoldInvoice, PerOrderIdMutex };
export { subscribeInvoice, payHoldInvoice };
34 changes: 34 additions & 0 deletions util/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { I18nContext } from '@grammyjs/i18n';
import { Mutex } from 'async-mutex';
import { ICommunity, IOrderChannel } from '../models/community';
import { IOrder } from '../models/order';
import { UserDocument } from '../models/user';
Expand Down Expand Up @@ -604,6 +605,38 @@ const generateQRWithImage = async (request: string, randomImage: string) => {
return canvas.toBuffer();
};

type LockCountedMutex = {
lockCount: number;
mutex: Mutex;
};

class PerOrderIdMutex {
mutexes: Map<string, LockCountedMutex> = new Map();

async runExclusive(orderId: string, callback: () => Promise<any>) {
let mtx: LockCountedMutex;
if (!this.mutexes.has(orderId)) {
mtx = { lockCount: 1, mutex: new Mutex() };
this.mutexes.set(orderId, mtx);
} else {
mtx = this.mutexes.get(orderId)!;
mtx.lockCount++;
}
let ret: any;
try {
ret = await mtx.mutex.runExclusive(callback);
} finally {
mtx.lockCount--;
if (mtx.lockCount == 0) {
this.mutexes.delete(orderId);
}
}
return ret;
}

static instance = new PerOrderIdMutex();
}

export {
isIso4217,
plural,
Expand Down Expand Up @@ -637,4 +670,5 @@ export {
isOrderCreator,
generateRandomImage,
generateQRWithImage,
PerOrderIdMutex,
};
Loading