Skip to content

Duplicated delivery reports received after multiple reconnects to Kafka #1143

@DanilaKlinov-Message-RC

Description

@DanilaKlinov-Message-RC

Environment Information

  • OS: MacOS as machine for reproduction.
  • Node Version: 20.x.x
  • NPM Version: 10.x.x
  • C++ Toolchain: g++ 15.2.0
  • node-rdkafka version: 3.6.1
  • kafka broker: 2.8.1

Steps to Reproduce

  1. Block access to kafka for client app.
  2. In client app: instanciate node-rdkafka and do connects in a for loop, retrying fails.
  3. Wait for ~60 sec.
  4. Restore connectivity to kafka.
  5. Send a message from node-rdkafka after connection succeeded.
  6. (bug) Receive multiple reports for the same message.

node-rdkafka Configuration Settings
"debug": "all",
"dr_cb": true,
"log_level": 7

Additional context
Here is the script I used as client app for reproduction:

const { Producer } = require('node-rdkafka');

const brokerList = process.env.KAFKA_BROKERS || 'kafka:9092';
const topic = process.env.KAFKA_TOPIC || 'docker-compose-test-topic';

/**
 * Promisify the connect method
 */
function connectAsync(producer, metadataOptions) {
  return new Promise((resolve, reject) => {
    producer.connect(metadataOptions, (err, metadata) => {
      if (err) {
        reject(err);
      } else {
        resolve(metadata);
      }
    });
  });
}

let producer = null;

function sendMessage() {
  if (!producer) {
    console.warn('[SIGUSR1] Producer not initialized yet, cannot send message');
    return;
  }

  const message = Buffer.from(
    JSON.stringify({
      source: 'glip-kafka-client-docker-compose-test-direct',
      timestamp: new Date().toISOString(),
    }),
    'utf8'
  );

  console.log('[SIGUSR1] Sending message to topic:', topic);
  try {
    producer.produce(
      topic,
      null,
      message,
      null,
      null,
    );
    producer.poll();
  } catch (err) {
    console.error('[SIGUSR1] Produce error:', err);
  }
}

async function main() {
  console.log('Creating producer, brokerList=%s', brokerList);

  const producerConfig = {
    "metadata.broker.list": brokerList,
    "debug": "all",
    "dr_cb": true,
    "log_level": 7
  };

  producer = new Producer(producerConfig);

  producer.on('event.error', (err) => {
    console.error('Producer error event:', err);
  });

  producer.on('connection.failure', (err, metrics) => {
    console.error('Connection failure:', err, 'metrics:', metrics);
  });

  producer.on('disconnected', (metrics) => {
    console.log('Producer disconnected, metrics:', metrics);
  });

  producer.on('delivery-report', (err, report) => {
    if (err) {
      console.error('Delivery report (error):', err);
    } else {
      console.log('Delivery report (success):', report);
    }
  });

  producer.on('event.log', (event) => {
    const { severity, fac, message } = event;
    console.log(`kafka-${severity} ${fac} ${message}`);
  });

  producer.setPollInterval(100);

  // Infinite connect attempts in async for loop
  console.log('Starting infinite connect attempts...');
  let attempt = 0;
  for (;;) {
    attempt++;
    try {
      console.log(`[Attempt ${attempt}] Connecting to Kafka...`);
      await connectAsync(producer, {});
      console.log(`[Attempt ${attempt}] Successfully connected!`);
      break; // Exit loop on successful connection
    } catch (err) {
      console.error(`[Attempt ${attempt}] Connect failed:`, err.message || err);
      // Wait a bit before retrying (exponential backoff would be better, but keeping it simple)
      await new Promise(resolve => setTimeout(resolve, 1000));
    }
  }

  sendMessage();

  runIndefinitely();
}

function runIndefinitely() {
  console.log('Running indefinitely (Ctrl+C to exit)');
  setInterval(() => {
    // Poll for delivery reports
    if (producer) {
      producer.poll();
    }
  }, 100);

  process.on('SIGINT', async () => {
    console.log('SIGINT received, disconnecting...');
    if (producer) {
      producer.disconnect(() => {
        console.log('Disconnected');
        process.exit(0);
      });
    } else {
      process.exit(0);
    }
  });

  process.on('SIGTERM', async () => {
    console.log('SIGTERM received, disconnecting...');
    if (producer) {
      producer.disconnect(() => {
        console.log('Disconnected');
        process.exit(0);
      });
    } else {
      process.exit(0);
    }
  });
}

main().catch((err) => {
  console.error('Fatal error in main:', err);
  process.exit(1);
});

Part of the output logs:

test-1  | Delivery report (success): {
test-1  |   topic: 'docker-compose-test-topic',
test-1  |   partition: 0,
test-1  |   offset: 3,
test-1  |   key: null,
test-1  |   timestamp: 1770112387069,
test-1  |   size: 96
test-1  | }
test-1  | Delivery report (success): {
test-1  |   topic: 'docker-compose-test-topic',
test-1  |   partition: 0,
test-1  |   offset: 3,
test-1  |   key: null,
test-1  |   timestamp: 1770112387069,
test-1  |   size: 96
test-1  | }
test-1  | Delivery report (success): {
test-1  |   topic: 'docker-compose-test-topic',
test-1  |   partition: 0,
test-1  |   offset: 3,
test-1  |   key: null,
test-1  |   timestamp: 1770112387069,
test-1  |   size: 96
test-1  | }

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions