diff --git a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala index f5eb0f0153..1a854985b5 100644 --- a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala +++ b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala @@ -120,7 +120,8 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu LogConfig.validate(oldConfigs, properties, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), kafkaConfig.disklessAllowFromClassicEnabled, - kafkaConfig.disklessStorageSystemEnabled) + kafkaConfig.disklessStorageSystemEnabled, + kafkaConfig.disklessRemoteStorageConsolidationEnabled) case BROKER => validateBrokerName(resource.name()) case CLIENT_METRICS => val properties = new Properties() diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 3a52d2ba60..55320a5e4c 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -426,6 +426,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) val disklessStorageSystemEnabled: Boolean = getBoolean(ServerConfigs.DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG) val disklessAllowFromClassicEnabled: Boolean = getBoolean(ServerConfigs.DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG) val disklessManagedReplicasEnabled: Boolean = getBoolean(ServerConfigs.DISKLESS_MANAGED_REPLICAS_ENABLE_CONFIG) + val disklessRemoteStorageConsolidationEnabled: Boolean = getBoolean(ServerConfigs.DISKLESS_REMOTE_STORAGE_CONSOLIDATION_ENABLE_CONFIG) val classicRemoteStorageForceEnabled: Boolean = getBoolean(ServerConfigs.CLASSIC_REMOTE_STORAGE_FORCE_ENABLE_CONFIG) val classicRemoteStorageForceExcludeTopicRegexes: java.util.List[String] = getList(ServerConfigs.CLASSIC_REMOTE_STORAGE_FORCE_EXCLUDE_TOPIC_REGEXES_CONFIG) @@ -519,6 +520,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) // diskless.storage.system.enable // → diskless.managed.rf.enable // → diskless.allow.from.classic.enable (also requires remote.log.storage.system.enable) + // → diskless.remote.storage.consolidation.enable (also requires remote.log.storage.system.enable) if (disklessManagedReplicasEnabled) { require(disklessStorageSystemEnabled, s"${ServerConfigs.DISKLESS_MANAGED_REPLICAS_ENABLE_CONFIG} requires ${ServerConfigs.DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG}=true") @@ -530,6 +532,13 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) s"${ServerConfigs.DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG} requires ${RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP}=true") } + if (disklessRemoteStorageConsolidationEnabled) { + require(disklessManagedReplicasEnabled, + s"${ServerConfigs.DISKLESS_REMOTE_STORAGE_CONSOLIDATION_ENABLE_CONFIG} requires ${ServerConfigs.DISKLESS_MANAGED_REPLICAS_ENABLE_CONFIG}=true") + require(remoteLogManagerConfig.isRemoteStorageSystemEnabled, + s"${ServerConfigs.DISKLESS_REMOTE_STORAGE_CONSOLIDATION_ENABLE_CONFIG} requires ${RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP}=true") + } + require(logRollTimeMillis >= 1, "log.roll.ms must be greater than or equal to 1") require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be greater than or equal to 0") require(logRetentionTimeMillis >= 1 || logRetentionTimeMillis == -1, "log.retention.ms must be unlimited (-1) or, greater than or equal to 1") diff --git a/core/src/test/java/kafka/server/DisklessAndRemoteStorageConfigsTest.java b/core/src/test/java/kafka/server/DisklessAndRemoteStorageConfigsTest.java index 8d9bd131d7..13dfae4045 100644 --- a/core/src/test/java/kafka/server/DisklessAndRemoteStorageConfigsTest.java +++ b/core/src/test/java/kafka/server/DisklessAndRemoteStorageConfigsTest.java @@ -71,7 +71,7 @@ public class DisklessAndRemoteStorageConfigsTest { private static final String ENABLE_DISKLESS_ERROR = "It is invalid to enable diskless on an already existing topic."; private static final String DISABLE_DISKLESS_ERROR = "It is invalid to disable diskless."; - private static final String DISKLESS_REMOTE_SET_ERROR = "It is not valid to set a value for both diskless.enable and remote.storage.enable."; + private static final String DISKLESS_REMOTE_SET_ERROR = "It is not valid to set a value for both diskless.enable and remote.storage.enable unless it's for diskless migration or consolidation."; private static final String DISABLE_REMOTE_WITHOUT_DELETE_ERROR = "It is invalid to disable remote storage without deleting remote data. " + "If you want to keep the remote data and turn to read only, please set `remote.storage.enable=true,remote.log.copy.disable=true`. " + "If you want to disable remote storage and delete all remote data, please set `remote.storage.enable=false,remote.log.delete.on.disable=true`."; diff --git a/core/src/test/java/kafka/server/InklessConfigsTest.java b/core/src/test/java/kafka/server/InklessConfigsTest.java index a24f0e3cc9..e3f2e34863 100644 --- a/core/src/test/java/kafka/server/InklessConfigsTest.java +++ b/core/src/test/java/kafka/server/InklessConfigsTest.java @@ -80,13 +80,14 @@ private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig, boolean di } private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig, boolean disklessStorageEnableConfig, boolean isDisklessAllowFromClassicEnabled) throws Exception { - return init(defaultDisklessEnableConfig, disklessStorageEnableConfig, isDisklessAllowFromClassicEnabled, isDisklessAllowFromClassicEnabled, true, false, List.of()); + return init(defaultDisklessEnableConfig, disklessStorageEnableConfig, isDisklessAllowFromClassicEnabled, isDisklessAllowFromClassicEnabled, false, true, false, List.of()); } private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig, boolean disklessStorageEnableConfig, boolean isDisklessAllowFromClassicEnabled, boolean disklessManagedReplicasEnabled, + boolean disklessRemoteStorageConsolidationEnabled, boolean remoteLogStorageSystemEnabled, boolean classicRemoteStorageForceEnabled, List classicRemoteStorageForceExcludeTopicRegexes) throws Exception { @@ -101,6 +102,7 @@ private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig, .setConfigProp(ServerConfigs.DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG, String.valueOf(disklessStorageEnableConfig)) .setConfigProp(ServerConfigs.DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG, String.valueOf(isDisklessAllowFromClassicEnabled)) .setConfigProp(ServerConfigs.DISKLESS_MANAGED_REPLICAS_ENABLE_CONFIG, String.valueOf(disklessManagedReplicasEnabled)) + .setConfigProp(ServerConfigs.DISKLESS_REMOTE_STORAGE_CONSOLIDATION_ENABLE_CONFIG, String.valueOf(disklessRemoteStorageConsolidationEnabled)) .setConfigProp(ServerConfigs.CLASSIC_REMOTE_STORAGE_FORCE_ENABLE_CONFIG, String.valueOf(classicRemoteStorageForceEnabled)) .setConfigProp(ServerConfigs.CLASSIC_REMOTE_STORAGE_FORCE_EXCLUDE_TOPIC_REGEXES_CONFIG, String.join(",", classicRemoteStorageForceExcludeTopicRegexes)) @@ -230,7 +232,7 @@ final class ClassicRemoteStorageForcePolicy { private static final List EXCLUDED_TOPIC_REGEXES = List.of("_schemas", "mm2-(.*)"); private KafkaClusterTestKit initWithClassicRemoteStorageForceEnabled() throws Exception { - return init(false, true, false, false, true, true, EXCLUDED_TOPIC_REGEXES); + return init(false, true, false, false, false, true, true, EXCLUDED_TOPIC_REGEXES); } @Test @@ -403,7 +405,7 @@ final class FeatureFlagDependencyValidation { @Test void managedReplicasRequiresDisklessStorageSystem() { final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, - () -> init(false, false, false, true, true, false, List.of())); + () -> init(false, false, false, true, false, true, false, List.of())); assertEquals( "requirement failed: diskless.managed.rf.enable requires diskless.storage.system.enable=true", exception.getMessage()); @@ -412,7 +414,7 @@ void managedReplicasRequiresDisklessStorageSystem() { @Test void allowFromClassicRequiresManagedReplicas() { final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, - () -> init(false, true, true, false, true, false, List.of())); + () -> init(false, true, true, false, false, true, false, List.of())); assertEquals( "requirement failed: diskless.allow.from.classic.enable requires diskless.managed.rf.enable=true", exception.getMessage()); @@ -421,12 +423,30 @@ void allowFromClassicRequiresManagedReplicas() { @Test void allowFromClassicRequiresRemoteLogStorage() { final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, - () -> init(false, true, true, true, false, false, List.of())); + () -> init(false, true, true, true, false, false, false, List.of())); assertEquals( "requirement failed: diskless.allow.from.classic.enable requires remote.log.storage.system.enable=true", exception.getMessage()); } + @Test + void disklessRemoteStorageConsolidationRequiresManagedReplicas() { + final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, + () -> init(false, true, false, false, true, true, false, List.of())); + assertEquals( + "requirement failed: diskless.remote.storage.consolidation.enable requires diskless.managed.rf.enable=true", + exception.getMessage()); + } + + @Test + void disklessRemoteStorageConsolidationRequiresRemoteLogStorage() { + final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, + () -> init(false, true, false, true, true, false, false, List.of())); + assertEquals( + "requirement failed: diskless.remote.storage.consolidation.enable requires remote.log.storage.system.enable=true", + exception.getMessage()); + } + @Test void fullDependencyChainSucceeds() throws Exception { final KafkaClusterTestKit cluster = init(false, true, true); diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala index 05487db0cd..45eca38330 100644 --- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala @@ -430,12 +430,11 @@ class LogConfigTest { LogConfig.validate(logProps) } - @Test def testDisklessAndRemoteStorageAtCreation(): Unit = { val kafkaConfig = KafkaConfig.fromProps(TestUtils.createDummyBrokerConfig()) val noExisting: util.Map[String, String] = util.Map.of() - val mutualExclusionError = "It is not valid to set a value for both diskless.enable and remote.storage.enable." + val mutualExclusionError = "It is not valid to set a value for both diskless.enable and remote.storage.enable unless it's for diskless migration or consolidation." // Allowed to set diskless.enable=true at creation assertValid(noExisting, topicProps(TopicConfig.DISKLESS_ENABLE_CONFIG -> "true"), kafkaConfig) @@ -486,16 +485,229 @@ class LogConfigTest { kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled, kafkaConfig.disklessAllowFromClassicEnabled, - kafkaConfig.disklessStorageSystemEnabled + kafkaConfig.disklessStorageSystemEnabled, + kafkaConfig.disklessRemoteStorageConsolidationEnabled )) assertEquals("It is invalid to set diskless.enable if diskless storage system is not enabled.", ex.getMessage) } + @ParameterizedTest(name = "testDisklessRemoteStorageConsolidation with value: {0}") + @ValueSource(booleans = Array(true, false)) + def testDisklessRemoteStorageConsolidation(remoteStorageConsolidationEnabled: Boolean): Unit = { + val kafkaProps = TestUtils.createDummyBrokerConfig() + kafkaProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true") + kafkaProps.put(ServerConfigs.DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG, "true") + kafkaProps.put(ServerConfigs.DISKLESS_MANAGED_REPLICAS_ENABLE_CONFIG, "true") + kafkaProps.put(ServerConfigs.DISKLESS_REMOTE_STORAGE_CONSOLIDATION_ENABLE_CONFIG, remoteStorageConsolidationEnabled) + val kafkaConfig = KafkaConfig.fromProps(kafkaProps) + + if (!remoteStorageConsolidationEnabled) { + val ex = assertThrows(classOf[InvalidConfigurationException], + () => LogConfig.validate( + util.Map.of[String, String](), + topicProps(TopicConfig.DISKLESS_ENABLE_CONFIG -> "true", + TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG -> "true"), + kafkaConfig.extractLogConfigMap, + kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled, + kafkaConfig.disklessAllowFromClassicEnabled, + kafkaConfig.disklessStorageSystemEnabled, + kafkaConfig.disklessRemoteStorageConsolidationEnabled + )) + assertEquals("It is not valid to set a value for both diskless.enable and remote.storage.enable unless it's for diskless migration or consolidation.", + ex.getMessage) + } else { + LogConfig.validate( + util.Map.of[String, String](), + topicProps(TopicConfig.DISKLESS_ENABLE_CONFIG -> "true", + TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG -> "true"), + kafkaConfig.extractLogConfigMap, + kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled, + kafkaConfig.disklessAllowFromClassicEnabled, + kafkaConfig.disklessStorageSystemEnabled, + kafkaConfig.disklessRemoteStorageConsolidationEnabled + ) + } + } + + @Test + def testRemoteStorageConsolidationAtCreation(): Unit = { + val kafkaConfig = KafkaConfig.fromProps(TestUtils.createDummyBrokerConfig()) + val noExisting: util.Map[String, String] = util.Map.of() + val mutualExclusionError = "It is not valid to set a value for both diskless.enable and remote.storage.enable unless it's for diskless migration or consolidation." + + // Allowed to set diskless.enable=true at creation + assertValid(noExisting, topicProps(TopicConfig.DISKLESS_ENABLE_CONFIG -> "true"), kafkaConfig, + remoteStorageConsolidationEnabled = true) + + // Allowed to set remote.storage.enable=true at creation + assertValid(noExisting, topicProps(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG -> "true"), kafkaConfig, + remoteStorageConsolidationEnabled = true) + + // NOT allowed to set diskless.enable=false and remote.log.storage.enable=false explicitly at creation + assertInvalid(noExisting, topicProps( + TopicConfig.DISKLESS_ENABLE_CONFIG -> "false", + TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG -> "false"), + mutualExclusionError, + kafkaConfig, + remoteStorageConsolidationEnabled = true) + + // NOT allowed to set diskless.enable=false and remote.storage.enable=true at creation + assertInvalid(noExisting, topicProps( + TopicConfig.DISKLESS_ENABLE_CONFIG -> "false", + TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG -> "true"), + mutualExclusionError, + kafkaConfig, + remoteStorageConsolidationEnabled = true) + + // NOT allowed to set diskless.enable=true and remote.storage.enable=false at creation + assertInvalid(noExisting, topicProps( + TopicConfig.DISKLESS_ENABLE_CONFIG -> "true", + TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG -> "false"), + mutualExclusionError, + kafkaConfig, + remoteStorageConsolidationEnabled = true) + + // Allowed to set diskless.enable=true and remote.storage.enable=true at creation + assertValid(noExisting, topicProps( + TopicConfig.DISKLESS_ENABLE_CONFIG -> "true", + TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG -> "true"), + kafkaConfig, + remoteStorageConsolidationEnabled = true) + } + + @Test + def testRemoteStorageConsolidationAtUpdate(): Unit = { + val kafkaConfig = KafkaConfig.fromProps(TestUtils.createDummyBrokerConfig()) + val mutualExclusionError = "It is not valid to set a value for both diskless.enable and remote.storage.enable unless it's for diskless migration or consolidation." + + val existingWithoutDisklessOrRemote = util.Map.of(TopicConfig.RETENTION_MS_CONFIG, "1000") + val existingWithDisklessFalse = util.Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "false") + val existingWithDisklessTrue = util.Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "true") + val existingWithRemoteFalse = util.Map.of(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false") + val existingWithRemoteTrue = util.Map.of(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") + + val existingWithDisklessTrueRemoteTrue = util.Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "true", TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") + val existingWithDisklessFalseRemoteFalse = util.Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "false", TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false") + val existingWithDisklessTrueRemoteFalse = util.Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "true", TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false") + + // Case 1: set diskless.enable=true + val setDisklessTrue = topicProps(TopicConfig.DISKLESS_ENABLE_CONFIG -> "true") + + assertInvalid(existingWithoutDisklessOrRemote, setDisklessTrue, + "It is invalid to enable diskless on an already existing topic.", kafkaConfig, remoteStorageConsolidationEnabled = true) + assertInvalid(existingWithDisklessFalse, setDisklessTrue, + "It is invalid to enable diskless on an already existing topic.", kafkaConfig, remoteStorageConsolidationEnabled = true) + assertValid(existingWithDisklessTrue, setDisklessTrue, kafkaConfig, remoteStorageConsolidationEnabled = true) + assertInvalid(existingWithRemoteFalse, setDisklessTrue, + "It is invalid to enable diskless on an already existing topic.", kafkaConfig, remoteStorageConsolidationEnabled = true) + assertInvalid(existingWithRemoteTrue, setDisklessTrue, + "It is invalid to enable diskless on an already existing topic.", kafkaConfig, remoteStorageConsolidationEnabled = true) + + // Case 2: set diskless.enable=false + val setDisklessFalse = topicProps(TopicConfig.DISKLESS_ENABLE_CONFIG -> "false") + + assertValid(existingWithoutDisklessOrRemote, setDisklessFalse, kafkaConfig, remoteStorageConsolidationEnabled = true) + assertValid(existingWithDisklessFalse, setDisklessFalse, kafkaConfig, remoteStorageConsolidationEnabled = true) + assertInvalid(existingWithRemoteFalse, setDisklessFalse, mutualExclusionError, kafkaConfig, remoteStorageConsolidationEnabled = true) + assertInvalid(existingWithDisklessTrue, setDisklessFalse, + "It is invalid to disable diskless.", kafkaConfig, remoteStorageConsolidationEnabled = true) + assertInvalid(existingWithRemoteTrue, setDisklessFalse, mutualExclusionError, kafkaConfig, remoteStorageConsolidationEnabled = true) + + // Case 3: set remote.storage.enable=true + val setRemoteStorageTrue = topicProps(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG -> "true") + + assertValid(existingWithoutDisklessOrRemote, setRemoteStorageTrue, kafkaConfig, remoteStorageConsolidationEnabled = true) + assertInvalid(existingWithDisklessFalse, setRemoteStorageTrue, mutualExclusionError, kafkaConfig, remoteStorageConsolidationEnabled = true) + assertValid(existingWithDisklessTrue, setRemoteStorageTrue, kafkaConfig, remoteStorageConsolidationEnabled = true) + assertValid(existingWithRemoteFalse, setRemoteStorageTrue, kafkaConfig, remoteStorageConsolidationEnabled = true) + assertValid(existingWithRemoteTrue, setRemoteStorageTrue, kafkaConfig, remoteStorageConsolidationEnabled = true) + + // Case 4: set remote.storage.enable=false + val setRemoteStorageFalse = topicProps(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG -> "false") + + assertValid(existingWithoutDisklessOrRemote, setRemoteStorageFalse, kafkaConfig, remoteStorageConsolidationEnabled = true) + assertInvalid(existingWithDisklessFalse, setRemoteStorageFalse, mutualExclusionError, kafkaConfig, remoteStorageConsolidationEnabled = true) + assertInvalid(existingWithDisklessTrue, setRemoteStorageFalse, mutualExclusionError, kafkaConfig, remoteStorageConsolidationEnabled = true) + assertValid(existingWithRemoteFalse, setRemoteStorageFalse, kafkaConfig, remoteStorageConsolidationEnabled = true) + assertInvalid(existingWithRemoteTrue, setRemoteStorageFalse, + "It is invalid to disable remote storage without deleting remote data. If you want to keep the remote data and turn to read only, please set `remote.storage.enable=true,remote.log.copy.disable=true`. If you want to disable remote storage and delete all remote data, please set `remote.storage.enable=false,remote.log.delete.on.disable=true`.", + kafkaConfig, remoteStorageConsolidationEnabled = true) + + val setDisklessTrueRemoteStorageTrue = topicProps(TopicConfig.DISKLESS_ENABLE_CONFIG -> "true", TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG -> "true") + + // Case 5: diskless and remote storage stays enabled + assertValid(existingWithDisklessTrueRemoteTrue, setDisklessTrueRemoteStorageTrue, kafkaConfig, disklessAllowFromClassic = true, remoteStorageConsolidationEnabled = true) + assertValid(existingWithDisklessTrueRemoteTrue, setDisklessTrueRemoteStorageTrue, kafkaConfig, remoteStorageConsolidationEnabled = true) + assertValid(existingWithDisklessTrueRemoteTrue, setDisklessTrueRemoteStorageTrue, kafkaConfig, disklessAllowFromClassic = true) + // someone disables both the remote log system and diskless + assertInvalid(existingWithDisklessTrueRemoteTrue, setDisklessTrueRemoteStorageTrue, mutualExclusionError, kafkaConfig) + + // Case 6: diskless and remote storage stays disabled + val setDisklessFalseRemoteStorageFalse = topicProps(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG -> "false", TopicConfig.DISKLESS_ENABLE_CONFIG -> "false") + assertValid(existingWithDisklessFalseRemoteFalse, setDisklessFalseRemoteStorageFalse, kafkaConfig, disklessAllowFromClassic = true, remoteStorageConsolidationEnabled = true) + assertValid(existingWithDisklessFalseRemoteFalse, setDisklessFalseRemoteStorageFalse, kafkaConfig, remoteStorageConsolidationEnabled = true) + assertValid(existingWithDisklessFalseRemoteFalse, setDisklessFalseRemoteStorageFalse, kafkaConfig, disklessAllowFromClassic = true) + assertValid(existingWithDisklessFalseRemoteFalse, setDisklessFalseRemoteStorageFalse, kafkaConfig) + + // Case 7: diskless is enabled and remote storage becomes enabled + assertValid(existingWithDisklessTrueRemoteFalse, setDisklessTrueRemoteStorageTrue, kafkaConfig, disklessAllowFromClassic = true, remoteStorageConsolidationEnabled = true) + assertValid(existingWithDisklessTrueRemoteFalse, setDisklessTrueRemoteStorageTrue, kafkaConfig, remoteStorageConsolidationEnabled = true) + assertInvalid(existingWithDisklessTrueRemoteFalse, setDisklessTrueRemoteStorageTrue, mutualExclusionError, kafkaConfig, disklessAllowFromClassic = true) + assertInvalid(existingWithDisklessTrueRemoteFalse, setDisklessTrueRemoteStorageTrue, mutualExclusionError, kafkaConfig) + + // Case 8: if diskless and remote is enabled, can't disable remote storage + val setDisklessTrueRemoteStorageFalse = topicProps(TopicConfig.DISKLESS_ENABLE_CONFIG -> "true", TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG -> "false") + assertInvalid(existingWithDisklessTrueRemoteTrue, setDisklessTrueRemoteStorageFalse, mutualExclusionError, kafkaConfig, disklessAllowFromClassic = true, remoteStorageConsolidationEnabled = true) + assertInvalid(existingWithDisklessTrueRemoteTrue, setDisklessTrueRemoteStorageFalse, mutualExclusionError, kafkaConfig, remoteStorageConsolidationEnabled = true) + assertInvalid(existingWithDisklessTrueRemoteTrue, setDisklessTrueRemoteStorageFalse, mutualExclusionError, kafkaConfig, disklessAllowFromClassic = true) + assertInvalid(existingWithDisklessTrueRemoteTrue, setDisklessTrueRemoteStorageFalse, mutualExclusionError, kafkaConfig) + } + + @Test + def testDisklessAllowFromClassicAndRemoteStorageConsolidationAtUpdate(): Unit = { + val kafkaConfig = KafkaConfig.fromProps(TestUtils.createDummyBrokerConfig()) + val mutualExclusionError = "It is not valid to set a value for both diskless.enable and remote.storage.enable unless it's for diskless migration or consolidation." + val existingWithoutDisklessOrRemote = util.Map.of(TopicConfig.RETENTION_MS_CONFIG, "1000") + val existingWithDisklessFalse = util.Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "false") + val existingWithDisklessTrue = util.Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "true") + val existingWithRemoteFalse = util.Map.of(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false") + val existingWithRemoteTrue = util.Map.of(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") + + // Case 1: set diskless.enable=true with allowFromClassic=true + val setDisklessTrue = topicProps(TopicConfig.DISKLESS_ENABLE_CONFIG -> "true") + + assertValid(existingWithoutDisklessOrRemote, setDisklessTrue, kafkaConfig, disklessAllowFromClassic = true, remoteStorageConsolidationEnabled = true) + assertValid(existingWithDisklessFalse, setDisklessTrue, kafkaConfig, disklessAllowFromClassic = true, remoteStorageConsolidationEnabled = true) + assertValid(existingWithDisklessTrue, setDisklessTrue, kafkaConfig, disklessAllowFromClassic = true, remoteStorageConsolidationEnabled = true) + // Mutual exclusion still applies when existing remote.storage.enable=false + assertInvalid(existingWithRemoteFalse, setDisklessTrue, mutualExclusionError, kafkaConfig, disklessAllowFromClassic = true, remoteStorageConsolidationEnabled = true) + // Classic-to-diskless migration: setting diskless.enable=true on a topic with remote.storage.enable=true is valid. + // In the real controller flow (ConfigurationControlManager.validateAlterConfig), props contains the merged + // state of existing overrides + requested changes, so remote.storage.enable=true is included in props. + val setDisklessTrueWithExistingRemoteTrue = topicProps( + TopicConfig.DISKLESS_ENABLE_CONFIG -> "true", + TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG -> "true" + ) + assertValid(existingWithRemoteTrue, setDisklessTrueWithExistingRemoteTrue, kafkaConfig, disklessAllowFromClassic = true, remoteStorageConsolidationEnabled = true) + + // Case 2: set diskless.enable=false with allowFromClassic=true - disabling diskless is still forbidden + val setDisklessFalse = topicProps(TopicConfig.DISKLESS_ENABLE_CONFIG -> "false") + + assertValid(existingWithoutDisklessOrRemote, setDisklessFalse, kafkaConfig, disklessAllowFromClassic = true, remoteStorageConsolidationEnabled = true) + assertValid(existingWithDisklessFalse, setDisklessFalse, kafkaConfig, disklessAllowFromClassic = true, remoteStorageConsolidationEnabled = true) + assertInvalid(existingWithDisklessTrue, setDisklessFalse, + "It is invalid to disable diskless.", kafkaConfig, disklessAllowFromClassic = true, remoteStorageConsolidationEnabled = true) + assertInvalid(existingWithRemoteFalse, setDisklessFalse, + mutualExclusionError, kafkaConfig, disklessAllowFromClassic = true, remoteStorageConsolidationEnabled = true) + assertInvalid(existingWithRemoteTrue, setDisklessFalse, + mutualExclusionError, kafkaConfig, disklessAllowFromClassic = true, remoteStorageConsolidationEnabled = true) + } + @Test def testDisklessAndRemoteStorageAtUpdate(): Unit = { val kafkaConfig = KafkaConfig.fromProps(TestUtils.createDummyBrokerConfig()) - val mutualExclusionError = "It is not valid to set a value for both diskless.enable and remote.storage.enable." + val mutualExclusionError = "It is not valid to set a value for both diskless.enable and remote.storage.enable unless it's for diskless migration or consolidation." val existingWithoutDisklessOrRemote = util.Map.of(TopicConfig.RETENTION_MS_CONFIG, "1000") val existingWithDisklessFalse = util.Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "false") val existingWithDisklessTrue = util.Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "true") @@ -556,7 +768,7 @@ class LogConfigTest { @Test def testDisklessAllowFromClassicAtUpdate(): Unit = { val kafkaConfig = KafkaConfig.fromProps(TestUtils.createDummyBrokerConfig()) - val mutualExclusionError = "It is not valid to set a value for both diskless.enable and remote.storage.enable." + val mutualExclusionError = "It is not valid to set a value for both diskless.enable and remote.storage.enable unless it's for diskless migration or consolidation." val existingWithoutDisklessOrRemote = util.Map.of(TopicConfig.RETENTION_MS_CONFIG, "1000") val existingWithDisklessFalse = util.Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "false") val existingWithDisklessTrue = util.Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "true") @@ -637,14 +849,14 @@ class LogConfigTest { } private def assertValid(existingConfigs: util.Map[String, String], props: Properties, kafkaConfig: KafkaConfig, - disklessAllowFromClassic: Boolean = false): Unit = { - LogConfig.validate(existingConfigs, props, kafkaConfig.extractLogConfigMap, true, disklessAllowFromClassic) + disklessAllowFromClassic: Boolean = false, remoteStorageConsolidationEnabled: Boolean = false): Unit = { + LogConfig.validate(existingConfigs, props, kafkaConfig.extractLogConfigMap, true, disklessAllowFromClassic, true, remoteStorageConsolidationEnabled) } private def assertInvalid(existingConfigs: util.Map[String, String], props: Properties, expectedMessage: String, - kafkaConfig: KafkaConfig, disklessAllowFromClassic: Boolean = false): Unit = { + kafkaConfig: KafkaConfig, disklessAllowFromClassic: Boolean = false, remoteStorageConsolidationEnabled: Boolean = false): Unit = { val ex = assertThrows(classOf[InvalidConfigurationException], - () => LogConfig.validate(existingConfigs, props, kafkaConfig.extractLogConfigMap, true, disklessAllowFromClassic)) + () => LogConfig.validate(existingConfigs, props, kafkaConfig.extractLogConfigMap, true, disklessAllowFromClassic, true, remoteStorageConsolidationEnabled)) assertEquals(expectedMessage, ex.getMessage) } diff --git a/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java b/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java index 532ed0b0a5..02b5bbcfc4 100644 --- a/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java +++ b/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java @@ -154,6 +154,12 @@ public class ServerConfigs { "Explicit RF values are accepted. Placement uses standard rack-aware assignment. " + "When disabled, diskless topics use legacy RF=1 behavior."; + public static final String DISKLESS_REMOTE_STORAGE_CONSOLIDATION_ENABLE_CONFIG = "diskless.remote.storage.consolidation.enable"; + public static final boolean DISKLESS_REMOTE_STORAGE_CONSOLIDATION_ENABLE_DEFAULT = false; + public static final String DISKLESS_REMOTE_STORAGE_CONSOLIDATION_ENABLE_DOC = "When enabled, it allows topics to set both " + + "diskless.enable=true and remote.storage.enable=true on new topics. Setting both will start consolidating Diskless WAL segments into " + + "Kafka tiered log storage on the configured topic."; + public static final String CLASSIC_REMOTE_STORAGE_FORCE_ENABLE_CONFIG = "classic.remote.storage.force.enable"; public static final boolean CLASSIC_REMOTE_STORAGE_FORCE_ENABLE_DEFAULT = false; public static final String CLASSIC_REMOTE_STORAGE_FORCE_ENABLE_DOC = "Force classic topics to be created with remote.storage.enable=true, " + @@ -215,6 +221,8 @@ public class ServerConfigs { .define(DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG, BOOLEAN, DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_DEFAULT, LOW, DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_DOC) .define(DISKLESS_MANAGED_REPLICAS_ENABLE_CONFIG, BOOLEAN, DISKLESS_MANAGED_REPLICAS_ENABLE_DEFAULT, MEDIUM, DISKLESS_MANAGED_REPLICAS_ENABLE_DOC) + .define(DISKLESS_REMOTE_STORAGE_CONSOLIDATION_ENABLE_CONFIG, BOOLEAN, DISKLESS_REMOTE_STORAGE_CONSOLIDATION_ENABLE_DEFAULT, + MEDIUM, DISKLESS_REMOTE_STORAGE_CONSOLIDATION_ENABLE_DOC) .define(CLASSIC_REMOTE_STORAGE_FORCE_ENABLE_CONFIG, BOOLEAN, CLASSIC_REMOTE_STORAGE_FORCE_ENABLE_DEFAULT, LOW, CLASSIC_REMOTE_STORAGE_FORCE_ENABLE_DOC) .define(CLASSIC_REMOTE_STORAGE_FORCE_EXCLUDE_TOPIC_REGEXES_CONFIG, LIST, CLASSIC_REMOTE_STORAGE_FORCE_EXCLUDE_TOPIC_REGEXES_DEFAULT, diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java index 06be49d008..0e402ff2b7 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java @@ -495,74 +495,187 @@ public static void validateBrokerLogConfigValues(Map props, } } + /** + * LogConfigHelper is a collection of utility methods for implementing validation logic easier based on + * new, existing and combined configs (where combined is the broker default + the new configs). + */ + private record LogConfigHelper(Map existingConfigs, Map requestedConfigs, + Map combinedConfigs, boolean isDisklessAllowFromClassicEnabled, + boolean isRemoteStorageConsolidationEnabled) { + + public boolean isCreation() { + return existingConfigs.isEmpty(); + } + + public boolean isDisklessExplicitlySet() { + return requestedConfigs.containsKey(TopicConfig.DISKLESS_ENABLE_CONFIG); + } + + public boolean wasDisklessExplicitlySet() { + return existingConfigs.containsKey(TopicConfig.DISKLESS_ENABLE_CONFIG); + } + + public boolean isDisklessEnabled() { + if (isDisklessExplicitlySet()) { + return (boolean) combinedConfigs.get(TopicConfig.DISKLESS_ENABLE_CONFIG); + } else { + return wasDisklessEnabled(); + } + } + + public boolean wasDisklessEnabled() { + return Boolean.parseBoolean(existingConfigs.getOrDefault(TopicConfig.DISKLESS_ENABLE_CONFIG, "false")); + } + + public boolean isRemoteStorageExplicitlySet() { + return requestedConfigs.containsKey(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); + } + + public boolean wasRemoteStorageExplicitlySet() { + return existingConfigs.containsKey(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); + } + + public boolean requestedRemoteStorageEnabled() { + return (boolean) combinedConfigs.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); + } + + public boolean isRemoteStorageEnabled() { + if (isRemoteStorageExplicitlySet()) { + return requestedRemoteStorageEnabled(); + } else { + return wasRemoteStorageEnabled(); + } + } + + public boolean isDisklessConsolidationModeOnCreation() { + return isRemoteStorageConsolidationEnabled && isCreation() && isDisklessEnabled() && isRemoteStorageEnabled(); + } + + public boolean isValidConsolidationModeTransitionOnUpdate() { + // Consolidation update: allow steady-state updates and enabling remote on a diskless-only topic, + // without tripping mutual exclusion when both keys appear in the merged request. + // Note: the "both stay disabled" no-op case is handled by isDisklessAndRemoteStorageUnchanged(), + // which applies regardless of the consolidation flag. + if (!isRemoteStorageConsolidationEnabled || isCreation()) { + return false; + } + // diskless stays enabled, remote storage stays enabled (steady state) + if (isDisklessStaysEnabled() && isRemoteStorageStaysEnabled()) { + return true; + } + // diskless stays enabled, remote storage stays disabled (routine config update) + if (isDisklessStaysEnabled() && isRemoteStorageStaysDisabledForConsolidation()) { + return true; + } + // diskless stays enabled, remote storage becomes enabled (start consolidation) + if (isDisklessStaysEnabled() && isRemoteStorageBecomesEnabled()) { + return true; + } + return false; + } + + public boolean wasRemoteStorageEnabled() { + return Boolean.parseBoolean(existingConfigs.getOrDefault(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false")); + } + + public boolean isMigratedFromClassicWithRemoteStorage() { + return isDisklessAllowFromClassicEnabled + && isDisklessEnabled() + && wasRemoteStorageExplicitlySet() && wasRemoteStorageEnabled() + && requestedRemoteStorageEnabled(); + } + + /** Both overrides were already present and remain off; used to skip mutual exclusion without consolidation. */ + public boolean isBothExplicitlyDisabledSteadyStateUpdate() { + return !isCreation() + && wasDisklessExplicitlySet() && wasRemoteStorageExplicitlySet() + && !isDisklessEnabled() && !isRemoteStorageEnabled(); + } + + private boolean isDisklessStaysEnabled() { + return wasDisklessEnabled() && isDisklessEnabled(); + } + + private boolean isRemoteStorageStaysEnabled() { + return wasRemoteStorageEnabled() && isRemoteStorageEnabled(); + } + + private boolean isRemoteStorageStaysDisabled() { + return !wasRemoteStorageEnabled() && !isRemoteStorageEnabled(); + } + + // Like isRemoteStorageStaysDisabled() but excludes newly adding explicit remote.storage.enable=false + private boolean isRemoteStorageStaysDisabledForConsolidation() { + return isRemoteStorageStaysDisabled() + && (!isRemoteStorageExplicitlySet() || wasRemoteStorageExplicitlySet()); + } + + private boolean isRemoteStorageBecomesEnabled() { + return !wasRemoteStorageEnabled() && requestedRemoteStorageEnabled(); + } + } + private static void validateDiskless(Map existingConfigs, Map requestedConfigs, Map newConfigs, boolean isDisklessStorageSystemEnabled, - boolean isDisklessAllowFromClassicEnabled) { - final boolean isCreation = existingConfigs.isEmpty(); - final boolean isDisklessExplicitlySet = requestedConfigs.containsKey(TopicConfig.DISKLESS_ENABLE_CONFIG); - final boolean isRemoteStorageExplicitlySet = requestedConfigs.containsKey(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); - final boolean wasDisklessExplicitlySet = existingConfigs.containsKey(TopicConfig.DISKLESS_ENABLE_CONFIG); - final boolean wasRemoteStorageExplicitlySet = existingConfigs.containsKey(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); - final boolean wasDisklessEnabled = Boolean.parseBoolean(existingConfigs.getOrDefault(TopicConfig.DISKLESS_ENABLE_CONFIG, "false")); - final boolean requestedDisklessEnabled = (Boolean) newConfigs.get(TopicConfig.DISKLESS_ENABLE_CONFIG); - - if (isDisklessExplicitlySet && !isDisklessStorageSystemEnabled) { - throw new InvalidConfigurationException("It is invalid to set diskless.enable if diskless storage system is not enabled."); - } + boolean isDisklessAllowFromClassicEnabled, + boolean isRemoteStorageConsolidationEnabled) { + final var logConfigHelper = new LogConfigHelper(existingConfigs, requestedConfigs, newConfigs, + isDisklessAllowFromClassicEnabled, isRemoteStorageConsolidationEnabled); - final boolean isDisklessEnabled; - if (isDisklessExplicitlySet) { - isDisklessEnabled = requestedDisklessEnabled; - } else { - isDisklessEnabled = wasDisklessEnabled; + if (logConfigHelper.isDisklessExplicitlySet() && !isDisklessStorageSystemEnabled) { + throw new InvalidConfigurationException("It is invalid to set diskless.enable if diskless storage system is not enabled."); } - validateDisklessTransition(isCreation, isDisklessExplicitlySet, isDisklessEnabled, wasDisklessEnabled, isDisklessAllowFromClassicEnabled); + validateDisklessTransition(logConfigHelper, isDisklessAllowFromClassicEnabled); // Only one between diskless.enable and remote.storage.enable can be set, no matter the value. - // Exception: when classic-to-diskless migration is allowed, we permit diskless.enable=true + // Exception 1: when classic-to-diskless migration is allowed, we permit diskless.enable=true // on a topic that already had remote.storage.enable=true — both during the migration itself // and in steady state afterward (migrated topics retain both configs). // Note: in the controller path, requestedConfigs is the merged state (existing + changes), // so we cannot distinguish "client set this" from "already existed". We detect the exception // by checking that diskless is (or will be) enabled and remote storage was and remains enabled. - final boolean wasRemoteStorageEnabled = Boolean.parseBoolean(existingConfigs.getOrDefault(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false")); - final boolean requestedRemoteStorageEnabled = (Boolean) newConfigs.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); - final boolean isDisklessWithRemoteStorageLegacy = isDisklessAllowFromClassicEnabled - && isDisklessEnabled - && wasRemoteStorageExplicitlySet && wasRemoteStorageEnabled - && requestedRemoteStorageEnabled; - if (!isDisklessWithRemoteStorageLegacy) { - final boolean hasExplicitDiskless = isDisklessExplicitlySet || wasDisklessExplicitlySet; - final boolean hasExplicitRemoteStorage = isRemoteStorageExplicitlySet || wasRemoteStorageExplicitlySet; - validateDisklessAndRemoteStorageMutualExclusion(isDisklessExplicitlySet, isRemoteStorageExplicitlySet, hasExplicitDiskless, hasExplicitRemoteStorage); - } - } - - private static void validateDisklessTransition(boolean isCreation, - boolean isDisklessExplicitlySet, - boolean isDisklessEnabled, - boolean wasDisklessEnabled, + final boolean isMigratedFromClassicWithRemoteStorage = logConfigHelper.isMigratedFromClassicWithRemoteStorage(); + // Exception 2: when we're at creation we allow both properties to be set to true if remote storage + // consolidation is also enabled. + final boolean isDisklessConsolidationOnCreation = logConfigHelper.isDisklessConsolidationModeOnCreation(); + // Exception 3: if remote log storage consolidation is enabled, and we're on an update, we allow + // - diskless to stay enabled and remote storage to stay enabled (steady state) + // - diskless to stay enabled and remote storage to stay disabled (routine config update) + // - diskless to stay enabled and remote storage to become enabled (start consolidation) + final boolean isValidConsolidationModeTransitionOnUpdate = logConfigHelper.isValidConsolidationModeTransitionOnUpdate(); + // Exception 4: both keys were already present and remain explicitly false (no-op alter); allowed even + // when cluster consolidation is off, so routine config updates do not trip mutual exclusion. + final boolean isBothExplicitlyDisabledSteadyState = logConfigHelper.isBothExplicitlyDisabledSteadyStateUpdate(); + if (!isMigratedFromClassicWithRemoteStorage && !isDisklessConsolidationOnCreation && !isValidConsolidationModeTransitionOnUpdate + && !isBothExplicitlyDisabledSteadyState) { + validateDisklessAndRemoteStorageMutualExclusion(logConfigHelper); + } + } + + private static void validateDisklessTransition(LogConfigHelper logConfigHelper, boolean isDisklessAllowFromClassicEnabled) { - if (!isCreation && isDisklessExplicitlySet && isDisklessEnabled && !wasDisklessEnabled && !isDisklessAllowFromClassicEnabled) { + if (!logConfigHelper.isCreation() && logConfigHelper.isDisklessExplicitlySet() + && logConfigHelper.isDisklessEnabled() && !logConfigHelper.wasDisklessEnabled() && !isDisklessAllowFromClassicEnabled) { throw new InvalidConfigurationException("It is invalid to enable diskless on an already existing topic."); } // Diskless cannot be disabled after it's enabled - if (!isCreation && isDisklessExplicitlySet && !isDisklessEnabled && wasDisklessEnabled) { + if (!logConfigHelper.isCreation() && logConfigHelper.isDisklessExplicitlySet() + && !logConfigHelper.isDisklessEnabled() && logConfigHelper.wasDisklessEnabled()) { throw new InvalidConfigurationException("It is invalid to disable diskless."); } } - private static void validateDisklessAndRemoteStorageMutualExclusion(boolean isDisklessExplicitlySet, - boolean isRemoteStorageExplicitlySet, - boolean hasExplicitDiskless, - boolean hasExplicitRemoteStorage) { - if ((isDisklessExplicitlySet && hasExplicitRemoteStorage) || - (isRemoteStorageExplicitlySet && hasExplicitDiskless)) { - throw new InvalidConfigurationException("It is not valid to set a value for both diskless.enable and remote.storage.enable."); + private static void validateDisklessAndRemoteStorageMutualExclusion(LogConfigHelper logConfigHelper) { + final boolean hasExplicitDiskless = logConfigHelper.isDisklessExplicitlySet() || logConfigHelper.wasDisklessExplicitlySet(); + final boolean hasExplicitRemoteStorage = logConfigHelper.isRemoteStorageExplicitlySet() || logConfigHelper.wasRemoteStorageExplicitlySet(); + // Only one between diskless.enable and remote.storage.enable can be set if consolidation isn't enabled, no matter the value. + if ((logConfigHelper.isDisklessExplicitlySet() && hasExplicitRemoteStorage) || + (logConfigHelper.isRemoteStorageExplicitlySet() && hasExplicitDiskless)) { + throw new InvalidConfigurationException("It is not valid to set a value for both diskless.enable and remote.storage.enable unless it's for diskless migration or consolidation."); } } @@ -573,7 +686,7 @@ private static void validateDisklessAndRemoteStorageMutualExclusion(boolean isDi * The default values should be extracted from the KafkaConfig. * @param existingConfigs The existing properties * @param requestedConfigs The configs explicitly included in the topic update/create request - * @param newConfigs The new properties to be validated + * @param newConfigs The new properties to be validated (combined from broker + requested) * @param isRemoteLogStorageSystemEnabled true if system wise remote log storage is enabled */ private static void validateTopicLogConfigValues(Map existingConfigs, @@ -581,9 +694,10 @@ private static void validateTopicLogConfigValues(Map existingCon Map newConfigs, boolean isRemoteLogStorageSystemEnabled, boolean isDisklessStorageSystemEnabled, - boolean isDisklessAllowFromClassicEnabled) { + boolean isDisklessAllowFromClassicEnabled, + boolean isRemoteStorageConsolidationEnabled) { validateValues(newConfigs); - validateDiskless(existingConfigs, requestedConfigs, newConfigs, isDisklessStorageSystemEnabled, isDisklessAllowFromClassicEnabled); + validateDiskless(existingConfigs, requestedConfigs, newConfigs, isDisklessStorageSystemEnabled, isDisklessAllowFromClassicEnabled, isRemoteStorageConsolidationEnabled); boolean isRemoteLogStorageEnabled = (Boolean) newConfigs.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); if (isRemoteLogStorageEnabled) { @@ -700,7 +814,7 @@ public static void validate(Map existingConfigs, boolean isRemoteLogStorageSystemEnabled, boolean isDisklessAllowFromClassicEnabled) { validate(existingConfigs, props, configuredProps, isRemoteLogStorageSystemEnabled, - isDisklessAllowFromClassicEnabled, true); + isDisklessAllowFromClassicEnabled, true, false); } public static void validate(Map existingConfigs, @@ -708,7 +822,8 @@ public static void validate(Map existingConfigs, Map configuredProps, boolean isRemoteLogStorageSystemEnabled, boolean isDisklessAllowFromClassicEnabled, - boolean isDisklessStorageSystemEnabled) { + boolean isDisklessStorageSystemEnabled, + boolean isRemoteStorageConsolidationEnabled) { validateNames(props); if (configuredProps == null || configuredProps.isEmpty()) { Map valueMaps = CONFIG.parse(props); @@ -718,7 +833,7 @@ public static void validate(Map existingConfigs, combinedConfigs.putAll(props); Map valueMaps = CONFIG.parse(combinedConfigs); validateTopicLogConfigValues(existingConfigs, Utils.castToStringObjectMap(props), valueMaps, - isRemoteLogStorageSystemEnabled, isDisklessStorageSystemEnabled, isDisklessAllowFromClassicEnabled); + isRemoteLogStorageSystemEnabled, isDisklessStorageSystemEnabled, isDisklessAllowFromClassicEnabled, isRemoteStorageConsolidationEnabled); } }