feat(inkless): POD-2386 Allow remote.storage.enabled when diskless is enabled#556
feat(inkless): POD-2386 Allow remote.storage.enabled when diskless is enabled#556viktorsomogyi wants to merge 11 commits intomainfrom
Conversation
506e6e1 to
a699d79
Compare
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
Outdated
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
Outdated
Show resolved
Hide resolved
server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
Outdated
Show resolved
Hide resolved
# Conflicts: # core/src/test/scala/unit/kafka/log/LogConfigTest.scala # storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java # Conflicts: # storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java # Conflicts: # storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
a699d79 to
cbb9eca
Compare
c574bdb to
5786342
Compare
5786342 to
12a8902
Compare
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
Outdated
Show resolved
Hide resolved
| s"${ServerConfigs.DISKLESS_REMOTE_STORAGE_CONSOLIDATION_ENABLE_CONFIG} requires ${ServerConfigs.DISKLESS_MANAGED_REPLICAS_ENABLE_CONFIG}=true") | ||
| require(remoteLogManagerConfig.isRemoteStorageSystemEnabled, | ||
| s"${ServerConfigs.DISKLESS_REMOTE_STORAGE_CONSOLIDATION_ENABLE_CONFIG} requires ${RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP}=true") | ||
| } |
There was a problem hiding this comment.
agree it does not require allow-from-classic migration. Even though the topic type switcher work includes foundational work for TS consolidation (e.g. diskless start offset, etc) these are not guard by the feature flag.
Co-authored-by: Jorge Esteban Quilcate Otoya <jorge.quilcate@aiven.io>
jeqo
left a comment
There was a problem hiding this comment.
Looking good. Just a few more suggestions. Let's also ensure CI is passing.
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
Show resolved
Hide resolved
|
|
||
| public static void validateTurningOffRemoteStorageWithDelete(Map<?, ?> newConfigs, boolean wasRemoteLogEnabled, boolean isRemoteLogStorageEnabled) { | ||
| boolean isRemoteLogDeleteOnDisable = (Boolean) Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, false); | ||
| if (wasRemoteLogEnabled && !isRemoteLogStorageEnabled && !isRemoteLogDeleteOnDisable) { |
There was a problem hiding this comment.
We need to catch this no-op "config remains unchanged" pattern in the validations, e.g. in this case wasRemoteLogEnabled && !isRemoteLogStorageEnabled.
So, when validating diskless enforcements, they should only trigger when there is an actual change.
This can be useful for tooling like Terraform that always send the whole set of configs.
There was a problem hiding this comment.
Added a number of test cases in LogConfigTest and changed the validation slightly to satisfy these test conditions.
Let me know if you need other cases covered too.
Co-authored-by: Jorge Esteban Quilcate Otoya <jorge.quilcate@aiven.io> Co-authored-by: Viktor Somogyi-Vass <viktorsomogyi@gmail.com>
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
Outdated
Show resolved
Hide resolved
Co-authored-by: Jorge Esteban Quilcate Otoya <jorge.quilcate@aiven.io>
jeqo
left a comment
There was a problem hiding this comment.
A last round of suggestions to make the conditions on LogConfigHelper more explicit. Apart from this it looks good to me.
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
Outdated
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
Outdated
Show resolved
Hide resolved
Co-authored-by: Jorge Esteban Quilcate Otoya <jorge.quilcate@aiven.io>
jeqo
left a comment
There was a problem hiding this comment.
Forgot to publish this one 😅 should be the last one
| } | ||
|
|
||
| @Test | ||
| def test_disklessAllowFromClassic_and_remoteStorageConsolidation_atUpdate(): Unit = { |
There was a problem hiding this comment.
nit: naming inconsistency with other tests
| def test_disklessAllowFromClassic_and_remoteStorageConsolidation_atUpdate(): Unit = { | |
| def testDisklessAllowFromClassicAndRemoteStorageConsolidationAtUpdate(): Unit = { |
Adds the diskless.remote.storage.consolidation.enable config. When it is enabled, it will
Setting remote.storag.enable on a diskless topic will be the intended way to start the consolidation of WAL segments into tiered topics.