-
Notifications
You must be signed in to change notification settings - Fork 404
Open
Description
Environment Information
- OS: MacOS as machine for reproduction.
- Node Version: 20.x.x
- NPM Version: 10.x.x
- C++ Toolchain: g++ 15.2.0
- node-rdkafka version: 3.6.1
- kafka broker: 2.8.1
Steps to Reproduce
- Block access to kafka for client app.
- In client app: instanciate node-rdkafka and do connects in a for loop, retrying fails.
- Wait for ~60 sec.
- Restore connectivity to kafka.
- Send a message from node-rdkafka after connection succeeded.
- (bug) Receive multiple reports for the same message.
node-rdkafka Configuration Settings
"debug": "all",
"dr_cb": true,
"log_level": 7
Additional context
Here is the script I used as client app for reproduction:
const { Producer } = require('node-rdkafka');
const brokerList = process.env.KAFKA_BROKERS || 'kafka:9092';
const topic = process.env.KAFKA_TOPIC || 'docker-compose-test-topic';
/**
* Promisify the connect method
*/
function connectAsync(producer, metadataOptions) {
return new Promise((resolve, reject) => {
producer.connect(metadataOptions, (err, metadata) => {
if (err) {
reject(err);
} else {
resolve(metadata);
}
});
});
}
let producer = null;
function sendMessage() {
if (!producer) {
console.warn('[SIGUSR1] Producer not initialized yet, cannot send message');
return;
}
const message = Buffer.from(
JSON.stringify({
source: 'glip-kafka-client-docker-compose-test-direct',
timestamp: new Date().toISOString(),
}),
'utf8'
);
console.log('[SIGUSR1] Sending message to topic:', topic);
try {
producer.produce(
topic,
null,
message,
null,
null,
);
producer.poll();
} catch (err) {
console.error('[SIGUSR1] Produce error:', err);
}
}
async function main() {
console.log('Creating producer, brokerList=%s', brokerList);
const producerConfig = {
"metadata.broker.list": brokerList,
"debug": "all",
"dr_cb": true,
"log_level": 7
};
producer = new Producer(producerConfig);
producer.on('event.error', (err) => {
console.error('Producer error event:', err);
});
producer.on('connection.failure', (err, metrics) => {
console.error('Connection failure:', err, 'metrics:', metrics);
});
producer.on('disconnected', (metrics) => {
console.log('Producer disconnected, metrics:', metrics);
});
producer.on('delivery-report', (err, report) => {
if (err) {
console.error('Delivery report (error):', err);
} else {
console.log('Delivery report (success):', report);
}
});
producer.on('event.log', (event) => {
const { severity, fac, message } = event;
console.log(`kafka-${severity} ${fac} ${message}`);
});
producer.setPollInterval(100);
// Infinite connect attempts in async for loop
console.log('Starting infinite connect attempts...');
let attempt = 0;
for (;;) {
attempt++;
try {
console.log(`[Attempt ${attempt}] Connecting to Kafka...`);
await connectAsync(producer, {});
console.log(`[Attempt ${attempt}] Successfully connected!`);
break; // Exit loop on successful connection
} catch (err) {
console.error(`[Attempt ${attempt}] Connect failed:`, err.message || err);
// Wait a bit before retrying (exponential backoff would be better, but keeping it simple)
await new Promise(resolve => setTimeout(resolve, 1000));
}
}
sendMessage();
runIndefinitely();
}
function runIndefinitely() {
console.log('Running indefinitely (Ctrl+C to exit)');
setInterval(() => {
// Poll for delivery reports
if (producer) {
producer.poll();
}
}, 100);
process.on('SIGINT', async () => {
console.log('SIGINT received, disconnecting...');
if (producer) {
producer.disconnect(() => {
console.log('Disconnected');
process.exit(0);
});
} else {
process.exit(0);
}
});
process.on('SIGTERM', async () => {
console.log('SIGTERM received, disconnecting...');
if (producer) {
producer.disconnect(() => {
console.log('Disconnected');
process.exit(0);
});
} else {
process.exit(0);
}
});
}
main().catch((err) => {
console.error('Fatal error in main:', err);
process.exit(1);
});Part of the output logs:
test-1 | Delivery report (success): {
test-1 | topic: 'docker-compose-test-topic',
test-1 | partition: 0,
test-1 | offset: 3,
test-1 | key: null,
test-1 | timestamp: 1770112387069,
test-1 | size: 96
test-1 | }
test-1 | Delivery report (success): {
test-1 | topic: 'docker-compose-test-topic',
test-1 | partition: 0,
test-1 | offset: 3,
test-1 | key: null,
test-1 | timestamp: 1770112387069,
test-1 | size: 96
test-1 | }
test-1 | Delivery report (success): {
test-1 | topic: 'docker-compose-test-topic',
test-1 | partition: 0,
test-1 | offset: 3,
test-1 | key: null,
test-1 | timestamp: 1770112387069,
test-1 | size: 96
test-1 | }
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels