diff --git a/src/rdkafka_msgset_writer.c b/src/rdkafka_msgset_writer.c index 9ff595bfbe..4fab2cf13a 100644 --- a/src/rdkafka_msgset_writer.c +++ b/src/rdkafka_msgset_writer.c @@ -819,17 +819,11 @@ static int rd_kafka_msgset_writer_write_msgq(rd_kafka_msgset_writer_t *msetw, size_t max_msg_size = RD_MIN((size_t)msetw->msetw_rkb->rkb_rk->rk_conf.max_msg_size, (size_t)msetw->msetw_rkb->rkb_rk->rk_conf.batch_size); - rd_ts_t int_latency_base; rd_ts_t MaxTimestamp = 0; rd_kafka_msg_t *rkm; int msgcnt = 0; const rd_ts_t now = rd_clock(); - /* Internal latency calculation base. - * Uses rkm_ts_timeout which is enqueue time + timeout */ - int_latency_base = - now + ((rd_ts_t)rktp->rktp_rkt->rkt_conf.message_timeout_ms * 1000); - /* Acquire BaseTimestamp from first message. */ rkm = TAILQ_FIRST(&rkmq->rkmq_msgs); rd_kafka_assert(NULL, rkm); @@ -895,8 +889,7 @@ static int rd_kafka_msgset_writer_write_msgq(rd_kafka_msgset_writer_t *msetw, msetw->msetw_messages_kvlen += rkm->rkm_len + rkm->rkm_key_len; /* Add internal latency metrics */ - rd_avg_add(&rkb->rkb_avg_int_latency, - int_latency_base - rkm->rkm_ts_timeout); + rd_avg_add(&rkb->rkb_avg_int_latency, now - rkm->rkm_ts_enq); /* MessageSet v2's .MaxTimestamp field */ if (unlikely(MaxTimestamp < rkm->rkm_timestamp))