Skip to content

Conversation

@0xffff-zhiyan
Copy link
Contributor

@0xffff-zhiyan 0xffff-zhiyan commented Dec 2, 2025

Problem

https://issues.apache.org/jira/browse/KAFKA-19851
When upgrading from Kafka 3.x to 4.0, the metadata log may contain
dynamic configurations that were removed in 4.0 (e.g.,
message.format.version per KIP-724). These removed configs cause
InvalidConfigurationException when users attempt to modify any
configuration, because validation checks all existing configs including
the removed ones.

Implementation

Adds whitelist-based filtering to ConfigurationDelta to prevent
deprecated or invalid configurations from being applied. The whitelist
is initialized once during KafkaRaftServer startup using
getAllDynamicConfigNames() and stored in a thread-safe
AtomicReference. When a whitelist is set, ConfigurationDelta.apply()
filters out any configurations not in the whitelist.

Changes

ConfigurationDelta.java: Added static whitelist and filtering logic in
apply()
KafkaRaftServer.scala: Initialize whitelist in startup() method
ConfigurationsImageTest.java: Added unit tests for filtering behavior

Testing

Add Unit test to make sure removed configs are filtered during replay

@github-actions github-actions bot added triage PRs from the community core Kafka Broker kraft labels Dec 2, 2025
Copy link
Contributor

@ahuang98 ahuang98 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partial review, thanks for working on this Tony!

I was wondering if we considered changing the behavior of validate(ConfigResource resource, Map<String, String> newConfigs, Map<String, String> existingConfigs); to log vs throw when there is an illegal existing config?

* Get the set of valid configuration names for a given resource type.
* Returns empty set if configSchema is not initialized.
*/
static Set<String> getValidConfigNames(Type resourceType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have this method handle UNKNOWN type to simplify logic elsewhere?

Comment on lines 55 to 63
Supplier<KafkaConfigSchema> supplier = configSchemaSupplier;
if (supplier == null) {
return Set.of();
}
KafkaConfigSchema configSchema = supplier.get();
if (configSchema == null) {
return Set.of();
}
return configSchema.validConfigNames(resourceType);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about something like

return Optional.ofNullable(configSchemaSupplier)
    .map(Supplier::get)
    .map(schema -> schema.validConfigNames(resourceType))
    .orElse(Set.of());

Copy link
Contributor

@kevin-wu24 kevin-wu24 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for taking this on @0xffff-zhiyan. I just have some high level comments for your and @ahuang98's consideration. After doing some thinking on this, the issue is more complex than it appears if we want to actually remove these records from the metadata partition.

I was wondering if we considered changing the behavior of validate(ConfigResource resource, Map<String, String> newConfigs, Map<String, String> existingConfigs); to log vs throw when there is an illegal existing config?

I think the main question we want to answer is:

  1. Do we want kafka to even remove these now-invalid configs from the cluster metadata partition as part of this PR?

What happens if the user wants to downgrade to a lower version? If we "clean up" config records, downgrading the software now results in lost metadata. In my opinion, the lossy downgrade case is enough to convince me kafka should not proactively clean up these configs.

Ultimately, what we want is the active controller to gracefully handle an ALTER_CONFIG after upgrading to a software version which may make some of its existing config state unknown/invalid to the new software version.

The most simple approach is to not validate dynamic configs that are not known to kafka. This matches what we do for static configs, as you can add unknown configs to the .properties file and it will not impact kafka. However, because we persist this config state to disk via the metadata partition, it is a problem to allow arbitrary config updates.

If the controller is to keep returning an error like it does today in this state, it should return an error that lists all the now-invalid configs so it is straightforward for the user to clean them up. This error should also let the user know these configs are invalid because they are unrecognized by the current kafka software version. This means the user becomes aware that a downgrade would be lossy if they delete these configs.

* Represents changes to the configurations in the metadata image.
*/
public final class ConfigurationsDelta {
/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't need to change this file at all. Remember that each ConfigurationDelta object will contain the current configuration image for a given ConfigResource, as well as its deltas. When we call ConfigurationDelta#apply, that is the only place we need to make changes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the changes to this file.

@github-actions github-actions bot removed the triage PRs from the community label Dec 5, 2025
@ahuang98
Copy link
Contributor

ahuang98 commented Dec 5, 2025

I think I agree that it might have less intended side effects to not delete the unknown configs. About the downgrade scenario, I would assume most cases where we're introducing new topic config, it might involve a new MV, in which case we don't guarantee lossless downgrades (this is besides the point that MV downgrade isn't supported yet). We don't technically gate the actual topic config on MV or major versions, so it is quite possible we lose information unexpectedly on a downgrade.

The most simple approach is to not validate dynamic configs that are not known to kafka.

Seems similar to my question on if we can just avoid validating the existing configs and prevent new invalid configs from being added. I don't necessarily agree with allowing a user add a bad config - this could become a vulnerability if we don't have a cap on number of configs

@0xffff-zhiyan
Copy link
Contributor Author

0xffff-zhiyan commented Dec 5, 2025

avoid validating the existing configs and prevent new invalid configs from being added

I agree that is a better way.

If the controller is to keep returning an error like it does today in this state, it should return an error that lists all the now-invalid configs so it is straightforward for the user to clean them up.

that way doesn't fix our current issue. the problem is that whenever users add or modify configurations, we throw an exception if there are any invalid configs. Users have to manually remove them all, which is tedious and exactly what we want to improve. Simply informing them about the invalid configs doesn’t really simplify the process, because they still need to clean them up one by one. And they will lose those configs permanently at last.

So based on the discussion above, we'd better stop validating existing configs and preventing users from adding invalid configs.

My only concern is: Is it really the right approach to let Kafka tolerate configurations that should no longer exist in the metadata? If we ever introduce new logic that handles existing configs in the future, we might have to keep adding code paths that explicitly ignore these existing but invalid configs. That seems like it could gradually accumulate technical debt. If we want the metadata to be clean without losing those configs permanently, is it possible we introduce a new config called REMOVED_CONFIG and move all those configs to there?
@ahuang98 @kevin-wu24

@kevin-wu24
Copy link
Contributor

kevin-wu24 commented Dec 5, 2025

Thanks for the discussion @ahuang98 @0xffff-zhiyan:

I don't necessarily agree with allowing a user add a bad config - this could become a vulnerability if we don't have a cap on number of configs

I think I misunderstood your original comment. I agree that if we ignore the existing config metadata state and only validate what is contained in the ALTER_CONFIG request, a given version of kafka's dynamic config will be valid. When going between major versions, the removal from source code of config will not invalidate the existing dynamic config state on the new version of kafka, and allows ALTER_CONFIG to complete. This matches how the static .properties config is validated by kafka.

My only concern is: Is it really the right approach to let Kafka tolerate configurations that should no longer exist in the metadata? If we ever introduce new logic that handles existing configs in the future, we might have to keep adding code paths that explicitly ignore these existing but invalid configs

Dynamic configs that are not known by kafka, just like static configs, shouldn't invalidate the entire config. In this case, they are because ALTER_CONFIG will fail. The argument here is that we should not have been validating the existing dynamic config in the first place, since what is a "valid" (dynamic OR static) configuration depends only on the software version of kafka currently running. If I change software versions, fields in my static .properties file can go from valid -> unknown by kafka, and loading in those unknown configs into KafkaConfig does not throw an exception because they are ignored. We should apply this semantic to the dynamic configuration too.

Copy link
Contributor

@kevin-wu24 kevin-wu24 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes @0xffff-zhiyan. Left a review of src/main.

val configSchema = new KafkaConfigSchema(Map(
ConfigResource.Type.BROKER -> new ConfigDef(KafkaConfig.configDef),
ConfigResource.Type.TOPIC -> LogConfig.configDefCopy,
ConfigResource.Type.GROUP -> GroupConfig.configDef(),
Copy link
Contributor

@kevin-wu24 kevin-wu24 Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is DynamicBrokerConfig#AllDynamicConfigs not used as part of the whitelist?

minInsyncReplicasString,
ConfigDef.Type.INT);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the changes to this file. Look at ControllerConfigurationValidator to see how dynamic config changes are validated.

* Represents changes to the configurations in the metadata image.
*/
public final class ConfigurationsDelta {
/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the changes to this file.

Comment on lines 512 to 521
// Filter out invalid configs
if (type != Type.UNKNOWN) {
Set<String> validConfigNames = configSchema.validConfigNames(type);
if (!validConfigNames.isEmpty() && !validConfigNames.contains(record.name())) {
// Ignore the record if it's a removed/invalid config
log.debug("Ignoring ConfigRecord for {} with invalid/removed config name: {}",
new ConfigResource(type, record.resourceName()), record.name());
return;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not need this change. When upgrading the software version, the controller that becomes active must load its most recent snapshot from disk, and that will clean up the MetadataImage.

Map<String, String> newData = new HashMap<>(image.data().size());
Type resourceType = image.resource().type();
Set<String> validConfigNames = resourceType != Type.UNKNOWN ?
ConfigurationsDelta.getValidConfigNames(resourceType) : Set.of();
Copy link
Contributor

@kevin-wu24 kevin-wu24 Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look at ConfigCommandOptions#addConfig for the whitelist we should be using.

@github-actions github-actions bot added the small Small PRs label Dec 16, 2025
@0xffff-zhiyan
Copy link
Contributor Author

0xffff-zhiyan commented Dec 22, 2025

KafkaConfigSchema is initialized in KafkaRaftServer and depends on config definitions from the core module (e.g., KafkaConfig.configDef, LogConfig.configDefCopy), we cannot build the whitelist in the metadata module due to module dependency constraints.

I found a solution to initialize the whitelist in ConfigurationDelta during KafkaRaftServer startup by calling ConfigurationDelta.initializeValidConfigs() with the set of valid dynamic config names collected from the core module. This whitelist is stored in a static AtomicReference<Set<String>> field, ensuring one-time initialization. The apply() method then uses this whitelist to filter out invalid/deprecated configurations.

@kevin-wu24 @jsancio @ahuang98

@ahuang98
Copy link
Contributor

ahuang98 commented Jan 5, 2026

do we even need to filter what is being written to the metadata image? based on what we had discussed with allowing existing unknown configs to exist and only preventing new ones from being created, I would have thought there's no reason to change ConfigurationsDelta at all

@0xffff-zhiyan
Copy link
Contributor Author

do we even need to filter what is being written to the metadata image? based on what we had discussed with allowing existing unknown configs to exist and only preventing new ones from being created, I would have thought there's no reason to change ConfigurationsDelta at all

Hmm... am I misunderstanding something? Didn't we agree to remove the existing invalid configs finally?

@ahuang98
Copy link
Contributor

ahuang98 commented Jan 9, 2026

@0xffff-zhiyan thanks for clarifying, from reading Kevin's last comments I had assumed we were only going to validate new configs as they came in (and not sanitize existing configs)

@0xffff-zhiyan
Copy link
Contributor Author

0xffff-zhiyan commented Jan 9, 2026

@ahuang98 Thanks!
I looked into validateAlterConfig, and the flow works like this: when we alter configs, we first read all existing configs from the snapshot into a map (existingConfigsSnapshot). Then we apply the config-altered records to update the configs in that map, and finally we validate the entire map. Because of this, it’s hard to cleanly separate validation of existing configs from validation of new configs.
So I added a filter when reading existing configs from the snapshot to ensure that all configs in the map are valid. I think this should prevent validation failures caused by existing invalid configs.

override def startup(): Unit = {
Mx4jLoader.maybeLoad()
// Initialize the whitelist for ConfigurationDelta to filter deprecated/invalid configs
ConfigurationDelta.initializeValidConfigs(getAllDynamicConfigNames)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MetadataLoader is built in SharedServer, I'm wondering if it will be better practice to generate the set of dynamicConfigNames in sharedServer and then pass it all the way through form MetadataLoader to MetadataDelta to ConfigurationsDelta etc.

import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.image.ConfigurationDelta;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similarly here, wondering if it would be better to grab the set of valid dynamic configs from ControllerServer which already holds a reference to sharedServer. this could be passed all the way through to configurationControlManager. as an example, you could take a look at how sharedServer.metadataEncryptorFactory gets propagated to configurationControl in QuorumController.

public final class ConfigurationDelta {
private final ConfigurationImage image;
private final Map<String, Optional<String>> changes = new HashMap<>();
private static final AtomicReference<Set<String>> VALID_CONFIGS_REF = new AtomicReference<>(null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

guessing this is an atomicReference because we're worried about calling initializeValidConfigs more than once (presumably on accident). this could be avoided if we instantiate this value as part of the ConfigurationDelta constructor

Copy link
Contributor

@ahuang98 ahuang98 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey Tony, had a chance to discuss this a bit with @jsancio to improve my own understanding of how we might resolve some dependencies.

What do you think of the following?

We could create a validator that can check if a config name is valid for a given resource type (broker, topic, user, etc.) and then pass it through from SharedServer all the way down to where it's needed (ConfigurationDelta and ConfigurationControlManager).

We should probably define a functional interface for this validator which should live in the metadata module based on the fact the classes which will rely on it live in metadata. Since the actual knowledge of which configs are valid (e.g. DynamicConfig.Broker) is defined in the core module, the implementation of this interface can live in core.

}

public ConfigurationImage apply() {
Set<String> whitelist = VALID_CONFIGS_REF.get();
Copy link
Contributor

@ahuang98 ahuang98 Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there might be a better place to do this validation (you might want to take a look at FeaturesDelta and see how we handle filtering there for an old bug)

another good point from @jsancio is that all XXXDelta's apply methods are doing are applying deltas ontop of a base image to produce a new image. that new image then becomes the base image for the next delta to be applied ontop of. given that we always start with an empty base image, it follows that we only need to validate the deltas (referenced to as changes in this code) to ensure every image that is produced as a result is valid

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m a bit confused about how we construct the image from the snapshot. Do we keep applying deltas incrementally, or do we construct the image directly from the snapshot without going through deltas? If it’s the latter, then validating only the deltas wouldn’t be sufficient to remove invalid configs.

Copy link
Contributor

@kevin-wu24 kevin-wu24 Jan 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can look at MetadataLoader#handleLoadSnapshot for how the metadata publishing pipeline (which generates the deltas we are working with here) deals with a snapshot received from the KRaft layer. The initial "image" is always empty, so the entire first snapshot loaded from KRaft for a given JVM lifetime is a delta. Since the SnapshotGenerator is a metadata publisher, the next snapshot that is written to disk after the "cleanup" code executes should no longer have dynamic configs unknown to kafka.

However, in order for the active controller to successfully handle an alter config request after updating its software, but before it loads a "cleaned" snapshot, I think we still need to clean up the ConfigurationControlManager's state, because its configData is the "dynamic config state of truth" for handling RPCs, not whatever exists in the MetadataLoader. Controller state for RPC handling uses QuoumController#QuorumMetaLogListener to load snapshots from KRaft, which is separate code from MetadataLoader.

I think I made an observation before that it seemed incorrect to do this kind of metadata state modification without explicitly committing records to KRaft. The MetadataLoader and QuoumController#QuorumMetaLogListener are both raft listeners, with each being responsible for different things (the former is responsible for generating the next snapshot on-disk, amongst other things, and the latter is indirectly responsible, via the ConfigurationControlManager, for handling alter config requests). Should we instead commit an explicit "delete these X unknown dynamic config records" to KRaft alongside the user's requested alter config upon receiving an alter config request that would otherwise be invalid because of unknown configs? This would avoid code duplication in the listeners.

@ahuang98 @0xffff-zhiyan @jsancio Let me know what you think of this approach, or if I am misunderstanding something.

@0xffff-zhiyan
Copy link
Contributor Author

Hey Tony, had a chance to discuss this a bit with @jsancio to improve my own understanding of how we might resolve some dependencies.

What do you think of the following?

We could create a validator that can check if a config name is valid for a given resource type (broker, topic, user, etc.) and then pass it through from SharedServer all the way down to where it's needed (ConfigurationDelta and ConfigurationControlManager).

We should probably define a functional interface for this validator which should live in the metadata module based on the fact the classes which will rely on it live in metadata. Since the actual knowledge of which configs are valid (e.g. DynamicConfig.Broker) is defined in the core module, the implementation of this interface can live in core.

Thanks for the advice! This approach looks pretty good. I'll update my current implementation

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants