Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,4 @@ gdbrun*.gdb
TAGS
vcpkg_installed
*tmp-KafkaCluster*
build/
4 changes: 4 additions & 0 deletions CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ sasl.password | * | |
sasl.oauthbearer.config | * | | | low | SASL/OAUTHBEARER configuration. The format is implementation-dependent and must be parsed accordingly. The default unsecured token implementation (see https://tools.ietf.org/html/rfc7515#appendix-A.5) recognizes space-separated name=value pairs with valid names including principalClaimName, principal, scopeClaimName, scope, and lifeSeconds. The default value for principalClaimName is "sub", the default value for scopeClaimName is "scope", and the default value for lifeSeconds is 3600. The scope value is CSV format with the default value being no/empty scope. For example: `principalClaimName=azp principal=admin scopeClaimName=roles scope=role1,role2 lifeSeconds=600`. In addition, SASL extensions can be communicated to the broker via `extension_NAME=value`. For example: `principal=admin extension_traceId=123` <br>*Type: string*
enable.sasl.oauthbearer.unsecure.jwt | * | true, false | false | low | Enable the builtin unsecure JWT OAUTHBEARER token handler if no oauthbearer_refresh_cb has been set. This builtin handler should only be used for development or testing, and not in production. <br>*Type: boolean*
oauthbearer_token_refresh_cb | * | | | low | SASL/OAUTHBEARER token refresh callback (set with rd_kafka_conf_set_oauthbearer_token_refresh_cb(), triggered by rd_kafka_poll(), et.al. This callback will be triggered when it is time to refresh the client's OAUTHBEARER token. Also see `rd_kafka_conf_enable_sasl_queue()`. <br>*Type: see dedicated API*
sasl.login.connect.timeout.ms | * | 0 .. 0 | 5000 | low | The value in milliseconds for the external authentication provider connection timeout. Currently applies only to OAUTHBEARER. <br>*Type: integer*
sasl.login.read.timeout.ms | * | 0 .. 0 | 5000 | low | The value in milliseconds for the external authentication provider read timeout. Currently applies only to OAUTHBEARER. <br>*Type: integer*
sasl.login.retry.backoff.ms | * | 0 .. 0 | 100 | low | The value in milliseconds for the initial wait between login attempts to the external authentication provider. Login uses an exponential backoff algorithm with an initial wait based on the sasl.login.retry.backoff.ms setting and will double in wait length between attempts up to a maximum wait length specified by the sasl.login.retry.backoff.max.ms setting. Currently applies only to OAUTHBEARER. <br>*Type: integer*
sasl.login.retry.backoff.max.ms | * | 0 .. 0 | 10000 | low | The value in milliseconds for the maximum wait between login attempts to the external authentication provider. Login uses an exponential backoff algorithm with an initial wait based on the sasl.login.retry.backoff.ms setting and will double in wait length between attempts up to a maximum wait length specified by the sasl.login.retry.backoff.max.ms setting. Currently applies only to OAUTHBEARER. <br>*Type: integer*
sasl.oauthbearer.method | * | default, oidc | default | low | Set to "default" or "oidc" to control which login method to be used. If set to "oidc", the following properties must also be be specified: `sasl.oauthbearer.client.id`, `sasl.oauthbearer.client.secret`, and `sasl.oauthbearer.token.endpoint.url`. <br>*Type: enum value*
sasl.oauthbearer.client.id | * | | | low | Public identifier for the application. Must be unique across all clients that the authorization server handles. Only used when `sasl.oauthbearer.method` is set to "oidc". <br>*Type: string*
sasl.oauthbearer.client.credentials.client.id | * | | | low | Alias for `sasl.oauthbearer.client.id`: Public identifier for the application. Must be unique across all clients that the authorization server handles. Only used when `sasl.oauthbearer.method` is set to "oidc". <br>*Type: string*
Expand Down
99 changes: 74 additions & 25 deletions src/rdhttp.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "rdunittest.h"

#include <stdarg.h>
#include <math.h>

#include <curl/curl.h>
#include "rdhttp.h"
Expand All @@ -46,6 +47,18 @@

/** Maximum response size, increase as necessary. */
#define RD_HTTP_RESPONSE_SIZE_MAX 1024 * 1024 * 500 /* 500kb */
#define MIN(a, b) ((a) < (b) ? (a) : (b))
#define MAX(a, b) ((a) > (b) ? (a) : (b))


/**
* @brief Get the current timestamp in milliseconds
*/
long current_milliseconds(void) {
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
return (ts.tv_sec * 1000L) + (ts.tv_nsec / 1000000L);
}


void rd_http_error_destroy(rd_http_error_t *herr) {
Expand Down Expand Up @@ -316,7 +329,7 @@ rd_http_req_init(rd_kafka_t *rk, rd_http_req_t *hreq, const char *url) {
CURLPROTO_HTTP | CURLPROTO_HTTPS);
#endif
curl_easy_setopt(hreq->hreq_curl, CURLOPT_MAXREDIRS, 16);
curl_easy_setopt(hreq->hreq_curl, CURLOPT_TIMEOUT, 30);
curl_easy_setopt(hreq->hreq_curl, CURLOPT_TIMEOUT_MS, 30000L);
curl_easy_setopt(hreq->hreq_curl, CURLOPT_ERRORBUFFER,
hreq->hreq_curl_errstr);
curl_easy_setopt(hreq->hreq_curl, CURLOPT_NOSIGNAL, 1);
Expand Down Expand Up @@ -400,17 +413,17 @@ rd_http_error_t *rd_http_get(rd_kafka_t *rk,
const char *url,
char **headers_array,
size_t headers_array_cnt,
int timeout_s,
int retries,
int retry_ms,
long timeout_ms,
long read_timeout_ms,
long retry_backoff_ms,
long retry_backoff_max_ms,
rd_buf_t **rbufp,
char **content_type,
int *response_code) {
rd_http_req_t hreq;
rd_http_error_t *herr = NULL;
struct curl_slist *headers = NULL;
char *header;
int i;
size_t len, j;

*rbufp = NULL;
Expand All @@ -429,10 +442,21 @@ rd_http_error_t *rd_http_get(rd_kafka_t *rk,
headers = curl_slist_append(headers, header);
}
curl_easy_setopt(hreq.hreq_curl, CURLOPT_HTTPHEADER, headers);
if (timeout_s > 0)
curl_easy_setopt(hreq.hreq_curl, CURLOPT_TIMEOUT, timeout_s);

for (i = 0; i <= retries; i++) {
timeout_ms = MAX(timeout_ms, 0);
read_timeout_ms = MAX(read_timeout_ms, 0);
if (timeout_ms > 0) {
curl_easy_setopt(hreq.hreq_curl, CURLOPT_CONNECTTIMEOUT_MS, timeout_ms);
}
if (read_timeout_ms > 0)
curl_easy_setopt(hreq.hreq_curl, CURLOPT_TIMEOUT_MS, timeout_ms + read_timeout_ms);

long end_ms = current_milliseconds() + retry_backoff_max_ms;
int curr_attempt = 0;

while (current_milliseconds() <= end_ms) {
curr_attempt++;

if (rd_kafka_terminating(rk)) {
herr = rd_http_error_new(-1, "Terminating");
goto done;
Expand All @@ -448,15 +472,21 @@ rd_http_error_t *rd_http_get(rd_kafka_t *rk,
goto done;
}

long wait_ms = retry_backoff_ms *
(long)pow(2, curr_attempt - 1);

long diff = end_ms - current_milliseconds();
wait_ms = MIN(wait_ms, diff);

/* Retry if HTTP(S) request returns temporary error and there
* are remaining retries, else fail. */
if (i == retries || !rd_http_is_failure_temporary(herr->code)) {
if (wait_ms <= 0 || !rd_http_is_failure_temporary(herr->code)) {
goto done;
}

/* Retry */
rd_http_error_destroy(herr);
rd_usleep(retry_ms * 1000 * (i + 1), &rk->rk_terminate);
rd_usleep(wait_ms * 1000, &rk->rk_terminate);
}

*rbufp = hreq.hreq_buf;
Expand Down Expand Up @@ -512,7 +542,6 @@ rd_http_error_t *rd_http_parse_json(rd_http_req_t *hreq, cJSON **jsonp) {
return herr;
}


/**
* @brief Perform a blocking HTTP(S) request to \p url with
* HTTP(S) headers and data with \p timeout_s.
Expand All @@ -531,13 +560,13 @@ rd_http_error_t *rd_http_post_expect_json(rd_kafka_t *rk,
const struct curl_slist *headers,
const char *post_fields,
size_t post_fields_size,
int timeout_s,
int retries,
int retry_ms,
long timeout_ms,
long read_timeout_ms,
long retry_backoff_ms,
long retry_backoff_max_ms,
cJSON **jsonp) {
rd_http_error_t *herr;
rd_http_req_t hreq;
int i;
size_t len;
const char *content_type;

Expand All @@ -546,13 +575,25 @@ rd_http_error_t *rd_http_post_expect_json(rd_kafka_t *rk,
return herr;

curl_easy_setopt(hreq.hreq_curl, CURLOPT_HTTPHEADER, headers);
curl_easy_setopt(hreq.hreq_curl, CURLOPT_TIMEOUT, timeout_s);

timeout_ms = MAX(timeout_ms, 0);
read_timeout_ms = MAX(read_timeout_ms, 0);
if (timeout_ms > 0) {
curl_easy_setopt(hreq.hreq_curl, CURLOPT_CONNECTTIMEOUT_MS, timeout_ms);
}
if (read_timeout_ms > 0)
curl_easy_setopt(hreq.hreq_curl, CURLOPT_TIMEOUT_MS, timeout_ms + read_timeout_ms);

curl_easy_setopt(hreq.hreq_curl, CURLOPT_POSTFIELDSIZE,
post_fields_size);
curl_easy_setopt(hreq.hreq_curl, CURLOPT_POSTFIELDS, post_fields);

for (i = 0; i <= retries; i++) {
long end_ms = current_milliseconds() + retry_backoff_max_ms;
int curr_attempt = 0;

while (current_milliseconds() <= end_ms) {
curr_attempt++;

if (rd_kafka_terminating(rk)) {
rd_http_req_destroy(&hreq);
return rd_http_error_new(-1, "Terminating");
Expand All @@ -568,16 +609,23 @@ rd_http_error_t *rd_http_post_expect_json(rd_kafka_t *rk,
rd_http_req_destroy(&hreq);
return NULL;
}

long wait_ms = retry_backoff_ms *
(long)pow(2, curr_attempt - 1);

long diff = end_ms - current_milliseconds();
wait_ms = MIN(wait_ms, diff);

/* Retry if HTTP(S) request returns temporary error and there
* are remaining retries, else fail. */
if (i == retries || !rd_http_is_failure_temporary(herr->code)) {
if (wait_ms <= 0 || !rd_http_is_failure_temporary(herr->code)) {
rd_http_req_destroy(&hreq);
return herr;
}

/* Retry */
rd_http_error_destroy(herr);
rd_usleep(retry_ms * 1000 * (i + 1), &rk->rk_terminate);
rd_usleep(wait_ms * 1000, &rk->rk_terminate);
}

content_type = rd_http_req_get_content_type(&hreq);
Expand Down Expand Up @@ -643,9 +691,10 @@ rd_http_error_t *rd_http_get_json(rd_kafka_t *rk,
const char *url,
char **headers_array,
size_t headers_array_cnt,
int timeout_s,
int retries,
int retry_ms,
long timeout_ms,
long read_timeout_ms,
long retry_backoff_ms,
long retry_backoff_max_ms,
cJSON **jsonp) {
rd_http_error_t *herr;
int response_code;
Expand All @@ -662,7 +711,7 @@ rd_http_error_t *rd_http_get_json(rd_kafka_t *rk,
headers_array_new[headers_array_cnt++] = "Accept: application/json";

herr = rd_http_get(rk, url, headers_array_new, headers_array_cnt,
timeout_s, retries, retry_ms, &rbuf, &content_type,
timeout_ms, read_timeout_ms, retry_backoff_ms, retry_backoff_max_ms, &rbuf, &content_type,
&response_code);
rd_free(headers_array_new);

Expand Down Expand Up @@ -715,7 +764,7 @@ int unittest_http_get(void) {

/* Try the base url first, parse its JSON and extract a key-value. */
json = NULL;
herr = rd_http_get_json(rk, base_url, NULL, 0, 5, 1, 1000, &json);
herr = rd_http_get_json(rk, base_url, NULL, 0, 5, 5, 1, 1000, &json);
RD_UT_ASSERT(!herr, "Expected get_json(%s) to succeed, got: %s",
base_url, herr->errstr);

Expand All @@ -735,7 +784,7 @@ int unittest_http_get(void) {

/* Try the error URL, verify error code. */
json = NULL;
herr = rd_http_get_json(rk, error_url, NULL, 0, 5, 1, 1000, &json);
herr = rd_http_get_json(rk, error_url, NULL, 0, 5, 5, 1, 1000, &json);
RD_UT_ASSERT(herr != NULL, "Expected get_json(%s) to fail", error_url);
RD_UT_ASSERT(herr->code >= 400,
"Expected get_json(%s) error code >= "
Expand Down
21 changes: 12 additions & 9 deletions src/rdhttp.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,21 @@ rd_http_error_t *rd_http_get(rd_kafka_t *rk,
const char *url,
char **headers_array,
size_t headers_array_cnt,
int timeout_s,
int retries,
int retry_ms,
long timeout_ms,
long read_timeout_ms,
long retry_backoff_ms,
long retry_backoff_max_ms,
rd_buf_t **rbufp,
char **content_type,
int *response_code);
rd_http_error_t *rd_http_get_json(rd_kafka_t *rk,
const char *url,
char **headers_array,
size_t headers_array_cnt,
int timeout_s,
int retries,
int retry_ms,
long timeout_ms,
long read_timeout_ms,
long retry_backoff_ms,
long retry_backoff_max_ms,
cJSON **jsonp);

void rd_http_global_init(void);
Expand Down Expand Up @@ -88,9 +90,10 @@ rd_http_error_t *rd_http_post_expect_json(rd_kafka_t *rk,
const struct curl_slist *headers,
const char *data_to_token,
size_t data_to_token_size,
int timeout_s,
int retries,
int retry_ms,
long timeout_ms,
long read_timeout_ms,
long retry_backoff_ms,
long retry_backoff_max_ms,
cJSON **jsonp);
void rd_http_req_destroy(rd_http_req_t *hreq);

Expand Down
20 changes: 20 additions & 0 deletions src/rdkafka_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -1074,6 +1074,26 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
1,
0,
},
{_RK_GLOBAL, "sasl.login.connect.timeout.ms", _RK_C_INT,
_RK(sasl.login.connect_timeout_ms),
"The value in milliseconds for the external authentication provider connection timeout. Currently applies only to OAUTHBEARER.",
.vdef = 5 * 1000
},
{_RK_GLOBAL, "sasl.login.read.timeout.ms", _RK_C_INT,
_RK(sasl.login.read_timeout_ms),
"The value in milliseconds for the external authentication provider read timeout. Currently applies only to OAUTHBEARER.",
.vdef = 5 * 1000
},
{_RK_GLOBAL, "sasl.login.retry.backoff.ms", _RK_C_INT,
_RK(sasl.login.retry_backoff_ms),
"The value in milliseconds for the initial wait between login attempts to the external authentication provider. Login uses an exponential backoff algorithm with an initial wait based on the sasl.login.retry.backoff.ms setting and will double in wait length between attempts up to a maximum wait length specified by the sasl.login.retry.backoff.max.ms setting. Currently applies only to OAUTHBEARER.",
.vdef = 100
},
{_RK_GLOBAL, "sasl.login.retry.backoff.max.ms", _RK_C_INT,
_RK(sasl.login.retry_backoff_max_ms),
"The value in milliseconds for the maximum wait between login attempts to the external authentication provider. Login uses an exponential backoff algorithm with an initial wait based on the sasl.login.retry.backoff.ms setting and will double in wait length between attempts up to a maximum wait length specified by the sasl.login.retry.backoff.max.ms setting. Currently applies only to OAUTHBEARER.",
.vdef = 10 * 1000
},
{_RK_GLOBAL, "sasl.oauthbearer.method", _RK_C_S2I,
_RK(sasl.oauthbearer.method),
"Set to \"default\" or \"oidc\" to control which login method "
Expand Down
9 changes: 8 additions & 1 deletion src/rdkafka_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ typedef enum {

/* Increase in steps of 64 as needed.
* This must be larger than sizeof(rd_kafka_[topic_]conf_t) */
#define RD_KAFKA_CONF_PROPS_IDX_MAX (64 * 35)
#define RD_KAFKA_CONF_PROPS_IDX_MAX (64 * 39)

/**
* @struct rd_kafka_anyconf_t
Expand Down Expand Up @@ -382,6 +382,13 @@ struct rd_kafka_conf_s {
const char *oauthbearer_config,
void *opaque);
} oauthbearer;

struct {
long connect_timeout_ms;
long read_timeout_ms;
long retry_backoff_ms;
long retry_backoff_max_ms;
} login;
} sasl;

char *plugin_paths;
Expand Down
Loading