-
Notifications
You must be signed in to change notification settings - Fork 14.9k
KAFKA-19851 Delete dynamic config that were removed by Kafka #21053
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
ahuang98
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partial review, thanks for working on this Tony!
I was wondering if we considered changing the behavior of validate(ConfigResource resource, Map<String, String> newConfigs, Map<String, String> existingConfigs); to log vs throw when there is an illegal existing config?
| * Get the set of valid configuration names for a given resource type. | ||
| * Returns empty set if configSchema is not initialized. | ||
| */ | ||
| static Set<String> getValidConfigNames(Type resourceType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we have this method handle UNKNOWN type to simplify logic elsewhere?
| Supplier<KafkaConfigSchema> supplier = configSchemaSupplier; | ||
| if (supplier == null) { | ||
| return Set.of(); | ||
| } | ||
| KafkaConfigSchema configSchema = supplier.get(); | ||
| if (configSchema == null) { | ||
| return Set.of(); | ||
| } | ||
| return configSchema.validConfigNames(resourceType); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about something like
return Optional.ofNullable(configSchemaSupplier)
.map(Supplier::get)
.map(schema -> schema.validConfigNames(resourceType))
.orElse(Set.of());
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for taking this on @0xffff-zhiyan. I just have some high level comments for your and @ahuang98's consideration. After doing some thinking on this, the issue is more complex than it appears if we want to actually remove these records from the metadata partition.
I was wondering if we considered changing the behavior of validate(ConfigResource resource, Map<String, String> newConfigs, Map<String, String> existingConfigs); to log vs throw when there is an illegal existing config?
I think the main question we want to answer is:
- Do we want kafka to even remove these now-invalid configs from the cluster metadata partition as part of this PR?
What happens if the user wants to downgrade to a lower version? If we "clean up" config records, downgrading the software now results in lost metadata. In my opinion, the lossy downgrade case is enough to convince me kafka should not proactively clean up these configs.
Ultimately, what we want is the active controller to gracefully handle an ALTER_CONFIG after upgrading to a software version which may make some of its existing config state unknown/invalid to the new software version.
The most simple approach is to not validate dynamic configs that are not known to kafka. This matches what we do for static configs, as you can add unknown configs to the .properties file and it will not impact kafka. However, because we persist this config state to disk via the metadata partition, it is a problem to allow arbitrary config updates.
If the controller is to keep returning an error like it does today in this state, it should return an error that lists all the now-invalid configs so it is straightforward for the user to clean them up. This error should also let the user know these configs are invalid because they are unrecognized by the current kafka software version. This means the user becomes aware that a downgrade would be lossy if they delete these configs.
| * Represents changes to the configurations in the metadata image. | ||
| */ | ||
| public final class ConfigurationsDelta { | ||
| /** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't need to change this file at all. Remember that each ConfigurationDelta object will contain the current configuration image for a given ConfigResource, as well as its deltas. When we call ConfigurationDelta#apply, that is the only place we need to make changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove the changes to this file.
|
I think I agree that it might have less intended side effects to not delete the unknown configs. About the downgrade scenario, I would assume most cases where we're introducing new topic config, it might involve a new MV, in which case we don't guarantee lossless downgrades (this is besides the point that MV downgrade isn't supported yet). We don't technically gate the actual topic config on MV or major versions, so it is quite possible we lose information unexpectedly on a downgrade.
Seems similar to my question on if we can just avoid validating the existing configs and prevent new invalid configs from being added. I don't necessarily agree with allowing a user add a bad config - this could become a vulnerability if we don't have a cap on number of configs |
I agree that is a better way.
that way doesn't fix our current issue. the problem is that whenever users add or modify configurations, we throw an exception if there are any invalid configs. Users have to manually remove them all, which is tedious and exactly what we want to improve. Simply informing them about the invalid configs doesn’t really simplify the process, because they still need to clean them up one by one. And they will lose those configs permanently at last. So based on the discussion above, we'd better stop validating existing configs and preventing users from adding invalid configs. My only concern is: Is it really the right approach to let Kafka tolerate configurations that should no longer exist in the metadata? If we ever introduce new logic that handles existing configs in the future, we might have to keep adding code paths that explicitly ignore these existing but invalid configs. That seems like it could gradually accumulate technical debt. If we want the metadata to be clean without losing those configs permanently, is it possible we introduce a new config called |
|
Thanks for the discussion @ahuang98 @0xffff-zhiyan:
I think I misunderstood your original comment. I agree that if we ignore the existing config metadata state and only validate what is contained in the
Dynamic configs that are not known by kafka, just like static configs, shouldn't invalidate the entire config. In this case, they are because |
kevin-wu24
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the changes @0xffff-zhiyan. Left a review of src/main.
| val configSchema = new KafkaConfigSchema(Map( | ||
| ConfigResource.Type.BROKER -> new ConfigDef(KafkaConfig.configDef), | ||
| ConfigResource.Type.TOPIC -> LogConfig.configDefCopy, | ||
| ConfigResource.Type.GROUP -> GroupConfig.configDef(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is DynamicBrokerConfig#AllDynamicConfigs not used as part of the whitelist?
| minInsyncReplicasString, | ||
| ConfigDef.Type.INT); | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove the changes to this file. Look at ControllerConfigurationValidator to see how dynamic config changes are validated.
| * Represents changes to the configurations in the metadata image. | ||
| */ | ||
| public final class ConfigurationsDelta { | ||
| /** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove the changes to this file.
| // Filter out invalid configs | ||
| if (type != Type.UNKNOWN) { | ||
| Set<String> validConfigNames = configSchema.validConfigNames(type); | ||
| if (!validConfigNames.isEmpty() && !validConfigNames.contains(record.name())) { | ||
| // Ignore the record if it's a removed/invalid config | ||
| log.debug("Ignoring ConfigRecord for {} with invalid/removed config name: {}", | ||
| new ConfigResource(type, record.resourceName()), record.name()); | ||
| return; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not need this change. When upgrading the software version, the controller that becomes active must load its most recent snapshot from disk, and that will clean up the MetadataImage.
| Map<String, String> newData = new HashMap<>(image.data().size()); | ||
| Type resourceType = image.resource().type(); | ||
| Set<String> validConfigNames = resourceType != Type.UNKNOWN ? | ||
| ConfigurationsDelta.getValidConfigNames(resourceType) : Set.of(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Look at ConfigCommandOptions#addConfig for the whitelist we should be using.
47f1d90 to
3b76ce3
Compare
|
I found a solution to initialize the whitelist in |
|
do we even need to filter what is being written to the metadata image? based on what we had discussed with allowing existing unknown configs to exist and only preventing new ones from being created, I would have thought there's no reason to change ConfigurationsDelta at all |
Hmm... am I misunderstanding something? Didn't we agree to remove the existing invalid configs finally? |
|
@0xffff-zhiyan thanks for clarifying, from reading Kevin's last comments I had assumed we were only going to validate new configs as they came in (and not sanitize existing configs) |
|
@ahuang98 Thanks! |
| override def startup(): Unit = { | ||
| Mx4jLoader.maybeLoad() | ||
| // Initialize the whitelist for ConfigurationDelta to filter deprecated/invalid configs | ||
| ConfigurationDelta.initializeValidConfigs(getAllDynamicConfigNames) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MetadataLoader is built in SharedServer, I'm wondering if it will be better practice to generate the set of dynamicConfigNames in sharedServer and then pass it all the way through form MetadataLoader to MetadataDelta to ConfigurationsDelta etc.
| import org.apache.kafka.common.protocol.Errors; | ||
| import org.apache.kafka.common.requests.ApiError; | ||
| import org.apache.kafka.common.utils.LogContext; | ||
| import org.apache.kafka.image.ConfigurationDelta; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similarly here, wondering if it would be better to grab the set of valid dynamic configs from ControllerServer which already holds a reference to sharedServer. this could be passed all the way through to configurationControlManager. as an example, you could take a look at how sharedServer.metadataEncryptorFactory gets propagated to configurationControl in QuorumController.
| public final class ConfigurationDelta { | ||
| private final ConfigurationImage image; | ||
| private final Map<String, Optional<String>> changes = new HashMap<>(); | ||
| private static final AtomicReference<Set<String>> VALID_CONFIGS_REF = new AtomicReference<>(null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
guessing this is an atomicReference because we're worried about calling initializeValidConfigs more than once (presumably on accident). this could be avoided if we instantiate this value as part of the ConfigurationDelta constructor
ahuang98
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey Tony, had a chance to discuss this a bit with @jsancio to improve my own understanding of how we might resolve some dependencies.
What do you think of the following?
We could create a validator that can check if a config name is valid for a given resource type (broker, topic, user, etc.) and then pass it through from SharedServer all the way down to where it's needed (ConfigurationDelta and ConfigurationControlManager).
We should probably define a functional interface for this validator which should live in the metadata module based on the fact the classes which will rely on it live in metadata. Since the actual knowledge of which configs are valid (e.g. DynamicConfig.Broker) is defined in the core module, the implementation of this interface can live in core.
| } | ||
|
|
||
| public ConfigurationImage apply() { | ||
| Set<String> whitelist = VALID_CONFIGS_REF.get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there might be a better place to do this validation (you might want to take a look at FeaturesDelta and see how we handle filtering there for an old bug)
another good point from @jsancio is that all XXXDelta's apply methods are doing are applying deltas ontop of a base image to produce a new image. that new image then becomes the base image for the next delta to be applied ontop of. given that we always start with an empty base image, it follows that we only need to validate the deltas (referenced to as changes in this code) to ensure every image that is produced as a result is valid
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’m a bit confused about how we construct the image from the snapshot. Do we keep applying deltas incrementally, or do we construct the image directly from the snapshot without going through deltas? If it’s the latter, then validating only the deltas wouldn’t be sufficient to remove invalid configs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can look at MetadataLoader#handleLoadSnapshot for how the metadata publishing pipeline (which generates the deltas we are working with here) deals with a snapshot received from the KRaft layer. The initial "image" is always empty, so the entire first snapshot loaded from KRaft for a given JVM lifetime is a delta. Since the SnapshotGenerator is a metadata publisher, the next snapshot that is written to disk after the "cleanup" code executes should no longer have dynamic configs unknown to kafka.
However, in order for the active controller to successfully handle an alter config request after updating its software, but before it loads a "cleaned" snapshot, I think we still need to clean up the ConfigurationControlManager's state, because its configData is the "dynamic config state of truth" for handling RPCs, not whatever exists in the MetadataLoader. Controller state for RPC handling uses QuoumController#QuorumMetaLogListener to load snapshots from KRaft, which is separate code from MetadataLoader.
I think I made an observation before that it seemed incorrect to do this kind of metadata state modification without explicitly committing records to KRaft. The MetadataLoader and QuoumController#QuorumMetaLogListener are both raft listeners, with each being responsible for different things (the former is responsible for generating the next snapshot on-disk, amongst other things, and the latter is indirectly responsible, via the ConfigurationControlManager, for handling alter config requests). Should we instead commit an explicit "delete these X unknown dynamic config records" to KRaft alongside the user's requested alter config upon receiving an alter config request that would otherwise be invalid because of unknown configs? This would avoid code duplication in the listeners.
@ahuang98 @0xffff-zhiyan @jsancio Let me know what you think of this approach, or if I am misunderstanding something.
Thanks for the advice! This approach looks pretty good. I'll update my current implementation |
Problem
https://issues.apache.org/jira/browse/KAFKA-19851
When upgrading from Kafka 3.x to 4.0, the metadata log may contain
dynamic configurations that were removed in 4.0 (e.g.,
message.format.versionper KIP-724). These removed configs causeInvalidConfigurationExceptionwhen users attempt to modify anyconfiguration, because validation checks all existing configs including
the removed ones.
Implementation
Adds whitelist-based filtering to
ConfigurationDeltato preventdeprecated or invalid configurations from being applied. The whitelist
is initialized once during KafkaRaftServer startup using
getAllDynamicConfigNames()and stored in a thread-safeAtomicReference. When a whitelist is set,
ConfigurationDelta.apply()filters out any configurations not in the whitelist.
Changes
ConfigurationDelta.java:Added static whitelist and filtering logic inapply()
KafkaRaftServer.scala:Initialize whitelist in startup() methodConfigurationsImageTest.java:Added unit tests for filtering behaviorTesting
Add Unit test to make sure removed configs are filtered during replay