Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/rdkafka_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -890,6 +890,11 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
"No brokers or topics in metadata: should retry");
err = RD_KAFKA_RESP_ERR__PARTIAL;
goto err;
} else if (md->broker_cnt == 0) {
rd_rkb_dbg(rkb, METADATA, "METADATA",
"No brokers in metadata: should re-bootstrap");
err = RD_KAFKA_RESP_ERR_REBOOTSTRAP_REQUIRED;
goto err;
}

/* Update our list of brokers. */
Expand Down
18 changes: 18 additions & 0 deletions src/rdkafka_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -1631,6 +1631,18 @@ rd_kafka_resp_err_t rd_kafka_mock_broker_add(rd_kafka_mock_cluster_t *mcluster,
rd_kafka_op_req(mcluster->ops, rko, RD_POLL_INFINITE));
}

rd_kafka_resp_err_t
rd_kafka_mock_broker_count_override(rd_kafka_mock_cluster_t *mcluster,
int broker_cnt) {
rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_MOCK);

rko->rko_u.mock.cmd = RD_KAFKA_MOCK_CMD_BROKER_COUNT_OVERRIDE;
rko->rko_u.mock.broker_id = broker_cnt;

return rd_kafka_op_err_destroy(
rd_kafka_op_req(mcluster->ops, rko, RD_POLL_INFINITE));
}


/**
* @brief Starts listening on the mock broker socket.
Expand Down Expand Up @@ -2620,6 +2632,11 @@ rd_kafka_mock_cluster_cmd(rd_kafka_mock_cluster_t *mcluster,

rd_kafka_mock_cluster_reassign_partitions(mcluster);
break;

case RD_KAFKA_MOCK_CMD_BROKER_COUNT_OVERRIDE:
mcluster->broker_cnt_override = rko->rko_u.mock.broker_id;
break;

case RD_KAFKA_MOCK_CMD_COORD_SET:
if (!rd_kafka_mock_coord_set(mcluster, rko->rko_u.mock.name,
rko->rko_u.mock.str,
Expand Down Expand Up @@ -2818,6 +2835,7 @@ rd_kafka_mock_cluster_t *rd_kafka_mock_cluster_new(rd_kafka_t *rk,
(intptr_t)mcluster >> 2);

TAILQ_INIT(&mcluster->brokers);
mcluster->broker_cnt_override = -1;

for (i = 1; i <= broker_cnt; i++) {
if (!(mrkb = rd_kafka_mock_broker_new(mcluster, i, NULL))) {
Expand Down
11 changes: 11 additions & 0 deletions src/rdkafka_mock.h
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,17 @@ rd_kafka_mock_broker_decommission(rd_kafka_mock_cluster_t *cluster,
RD_EXPORT rd_kafka_resp_err_t
rd_kafka_mock_broker_add(rd_kafka_mock_cluster_t *mcluster, int32_t broker_id);

/**
* @brief Override broker count for Metadata responses.
*
* @param mcluster The mock cluster
* @param broker_cnt The override value, or -1 to remove override.
*
* @returns Error value or 0 if no error occurred
*/
RD_EXPORT rd_kafka_resp_err_t
rd_kafka_mock_broker_count_override(rd_kafka_mock_cluster_t *mcluster, int broker_cnt);


/**
* @brief Explicitly sets the coordinator. If this API is not a standard
Expand Down
9 changes: 7 additions & 2 deletions src/rdkafka_mock_handlers.c
Original file line number Diff line number Diff line change
Expand Up @@ -1191,7 +1191,8 @@ rd_kafka_mock_buf_write_Metadata_Topic(rd_kafka_mock_cluster_t *mcluster,
int i;
int partition_cnt =
(!mtopic || err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART ||
err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_ID)
err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_ID ||
mcluster->broker_cnt_override == 0)
? 0
: mtopic->partition_cnt;

Expand Down Expand Up @@ -1309,6 +1310,10 @@ static int rd_kafka_mock_handle_Metadata(rd_kafka_mock_connection_t *mconn,
of_Brokers_cnt = rd_kafka_buf_write_arraycnt_pos(resp);

TAILQ_FOREACH(mrkb, &mcluster->brokers, link) {
if (mcluster->broker_cnt_override >= 0 &&
response_Brokers_cnt >= mcluster->broker_cnt_override) {
break;
}
if (!mrkb->up)
continue;
/* Response: Brokers.Nodeid */
Expand All @@ -1334,7 +1339,7 @@ static int rd_kafka_mock_handle_Metadata(rd_kafka_mock_connection_t *mconn,

if (rkbuf->rkbuf_reqhdr.ApiVersion >= 1) {
/* Response: ControllerId */
rd_kafka_buf_write_i32(resp, mcluster->controller_id);
rd_kafka_buf_write_i32(resp, response_Brokers_cnt > 0 ? mcluster->controller_id : -1);
}

/* #Topics */
Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_mock_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ struct rd_kafka_mock_cluster_s {

TAILQ_HEAD(, rd_kafka_mock_broker_s) brokers;
int broker_cnt;
int broker_cnt_override;

TAILQ_HEAD(, rd_kafka_mock_topic_s) topics;
int topic_cnt;
Expand Down
2 changes: 2 additions & 0 deletions src/rdkafka_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,7 @@ struct rd_kafka_op_s {
RD_KAFKA_MOCK_CMD_BROKER_SET_RACK,
RD_KAFKA_MOCK_CMD_BROKER_DECOMMISSION,
RD_KAFKA_MOCK_CMD_BROKER_ADD,
RD_KAFKA_MOCK_CMD_BROKER_COUNT_OVERRIDE,
RD_KAFKA_MOCK_CMD_COORD_SET,
RD_KAFKA_MOCK_CMD_APIVERSION_SET,
RD_KAFKA_MOCK_CMD_REQUESTED_METRICS_SET,
Expand Down Expand Up @@ -622,6 +623,7 @@ struct rd_kafka_op_s {
* BROKER_SET_RACK
* BROKER_DECOMMISSION
* BROKER_ADD
* BROKER_COUNT_OVERRIDE
* COORD_SET */
int64_t lo; /**< Low offset, for:
* TOPIC_CREATE (part cnt)
Expand Down
138 changes: 138 additions & 0 deletions tests/0154-rebootstrap_mock.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2024, Confluent Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

#include "test.h"

struct log_hits_s {
rd_atomic32_t metadata;
rd_atomic32_t rebootstrap;
};

static void
log_cb(const rd_kafka_t *rk, int level, const char *fac, const char *buf) {
struct log_hits_s *log_hits = rd_kafka_opaque(rk);

if (!strcmp(fac, "METADATA") &&
strstr(buf, ": 0 brokers, 1 topics")) {
rd_atomic32_add(&log_hits->metadata, 1);
} else if (!strcmp(fac, "REBOOTSTRAP") &&
strstr(buf, "Starting re-bootstrap sequence")) {
rd_atomic32_add(&log_hits->rebootstrap, 1);
}

TEST_SAY("%s [%d] [%s] %s\n", rd_kafka_name(rk), level, fac, buf);
}

/**
* @brief Rebootstrap should not be cancelled if a metadata request
* returned no brokers.
*/
static void do_test_rebootstrap_after_metadata_no_brokers(void) {
rd_kafka_t *rk;
const char *bootstraps;
rd_kafka_mock_cluster_t *mcluster;
const char *topic = test_mk_topic_name(__FUNCTION__, 1);
rd_kafka_conf_t *conf;
struct log_hits_s log_hits;

SUB_TEST_QUICK();

test_curr->is_fatal_cb = test_error_is_not_fatal_cb;

mcluster = test_mock_cluster_new(3, &bootstraps);
rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1);
rd_kafka_mock_partition_set_follower(mcluster, topic, 0, 2);

test_conf_init(&conf, NULL, 10);
test_conf_set(conf, "bootstrap.servers", bootstraps);
test_conf_set(conf, "metadata.recovery.strategy", "rebootstrap");
/* Should re-bootstrap immediately regardless of this config */
test_conf_set(conf, "metadata.recovery.rebootstrap.trigger.ms", "3600000");
test_conf_set(conf, "debug", "metadata");

rd_atomic32_init(&log_hits.metadata, 0);
rd_atomic32_init(&log_hits.rebootstrap, 0);
rd_kafka_conf_set_log_cb(conf, log_cb);
rd_kafka_conf_set_opaque(conf, &log_hits);

rk = test_create_consumer(topic, NULL, conf, NULL);
test_consumer_assign_partition("assign", rk, topic, 0,
RD_KAFKA_OFFSET_INVALID);

test_produce_msgs_easy_v(topic, 0, 0, 0, 1, 1,
"bootstrap.servers", bootstraps, NULL);
test_consumer_poll_timeout("read 1", rk, 0, -1, -1, 1,
NULL, 1000);

TEST_ASSERT(rd_atomic32_get(&log_hits.metadata) == 0,
"Expected no empty metadata responses, got %d",
rd_atomic32_get(&log_hits.metadata));

TEST_ASSERT(rd_atomic32_get(&log_hits.rebootstrap) == 0,
"Expected no re-bootstraps, got %d",
rd_atomic32_get(&log_hits.rebootstrap));

/* Return no brokers in Metadata responses */
rd_kafka_mock_broker_count_override(mcluster, 0);

/* Trigger refresh */
rd_kafka_mock_broker_set_down(mcluster, 2);
rd_kafka_mock_partition_set_follower(mcluster, topic, 0, 3);

/* Wait some time for seeing the re-bootstrap */
rd_usleep(200 * 1000, NULL);

TEST_ASSERT(rd_atomic32_get(&log_hits.metadata) > 0,
"Expected at least 1 empty metadata response");

TEST_ASSERT(rd_atomic32_get(&log_hits.rebootstrap) > 0,
"Expected at least 1 re-bootstrap");

/* Restore brokers for Metadata responses */
rd_kafka_mock_broker_count_override(mcluster, -1);

test_produce_msgs_easy_v(topic, 0, 0, 0, 1, 0,
"bootstrap.servers", bootstraps, NULL);
test_consumer_poll_timeout("read 2", rk, 0, -1, -1, 1,
NULL, 2000);

test_consumer_close(rk);
rd_kafka_destroy(rk);
test_mock_cluster_destroy(mcluster);

SUB_TEST_PASS();
}


int main_0154_rebootstrap_mock(int argc, char **argv) {
TEST_SKIP_MOCK_CLUSTER(0);

do_test_rebootstrap_after_metadata_no_brokers();

return 0;
}
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ set(
0151-purge-brokers.c
0152-rebootstrap.c
0153-memberid.c
0154-rebootstrap_mock.c
8000-idle.cpp
8001-fetch_from_follower_mock_manual.c
test.c
Expand Down
2 changes: 2 additions & 0 deletions tests/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ _TEST_DECL(0150_telemetry_mock);
_TEST_DECL(0151_purge_brokers_mock);
_TEST_DECL(0152_rebootstrap_local);
_TEST_DECL(0153_memberid);
_TEST_DECL(0154_rebootstrap_mock);

/* Manual tests */
_TEST_DECL(8000_idle);
Expand Down Expand Up @@ -540,6 +541,7 @@ struct test tests[] = {
_TEST(0151_purge_brokers_mock, TEST_F_LOCAL),
_TEST(0152_rebootstrap_local, TEST_F_LOCAL),
_TEST(0153_memberid, TEST_F_LOCAL),
_TEST(0154_rebootstrap_mock, TEST_F_LOCAL),

/* Manual tests */
_TEST(8000_idle, TEST_F_MANUAL),
Expand Down
1 change: 1 addition & 0 deletions win32/tests/tests.vcxproj
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@
<ClCompile Include="..\..\tests\0151-purge-brokers.c" />
<ClCompile Include="..\..\tests\0152-rebootstrap.c" />
<ClCompile Include="..\..\tests\0153-memberid.c" />
<ClCompile Include="..\..\tests\0154-rebootstrap_mock.c" />
<ClCompile Include="..\..\tests\8000-idle.cpp" />
<ClCompile Include="..\..\tests\8001-fetch_from_follower_mock_manual.c" />
<ClCompile Include="..\..\tests\test.c" />
Expand Down