Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu
LogConfig.validate(oldConfigs, properties, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(),
kafkaConfig.disklessAllowFromClassicEnabled,
kafkaConfig.disklessStorageSystemEnabled)
kafkaConfig.disklessStorageSystemEnabled,
kafkaConfig.disklessRemoteStorageConsolidationEnabled)
case BROKER => validateBrokerName(resource.name())
case CLIENT_METRICS =>
val properties = new Properties()
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
val disklessStorageSystemEnabled: Boolean = getBoolean(ServerConfigs.DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG)
val disklessAllowFromClassicEnabled: Boolean = getBoolean(ServerConfigs.DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG)
val disklessManagedReplicasEnabled: Boolean = getBoolean(ServerConfigs.DISKLESS_MANAGED_REPLICAS_ENABLE_CONFIG)
val disklessRemoteStorageConsolidationEnabled: Boolean = getBoolean(ServerConfigs.DISKLESS_REMOTE_STORAGE_CONSOLIDATION_ENABLE_CONFIG)
val classicRemoteStorageForceEnabled: Boolean = getBoolean(ServerConfigs.CLASSIC_REMOTE_STORAGE_FORCE_ENABLE_CONFIG)
val classicRemoteStorageForceExcludeTopicRegexes: java.util.List[String] =
getList(ServerConfigs.CLASSIC_REMOTE_STORAGE_FORCE_EXCLUDE_TOPIC_REGEXES_CONFIG)
Expand Down Expand Up @@ -519,6 +520,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
// diskless.storage.system.enable
// → diskless.managed.rf.enable
// → diskless.allow.from.classic.enable (also requires remote.log.storage.system.enable)
// → diskless.remote.storage.consolidation.enable (also requires remote.log.storage.system.enable)
if (disklessManagedReplicasEnabled) {
require(disklessStorageSystemEnabled,
s"${ServerConfigs.DISKLESS_MANAGED_REPLICAS_ENABLE_CONFIG} requires ${ServerConfigs.DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG}=true")
Expand All @@ -530,6 +532,13 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
s"${ServerConfigs.DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG} requires ${RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP}=true")
}

if (disklessRemoteStorageConsolidationEnabled) {
require(disklessManagedReplicasEnabled,
s"${ServerConfigs.DISKLESS_REMOTE_STORAGE_CONSOLIDATION_ENABLE_CONFIG} requires ${ServerConfigs.DISKLESS_MANAGED_REPLICAS_ENABLE_CONFIG}=true")
require(remoteLogManagerConfig.isRemoteStorageSystemEnabled,
s"${ServerConfigs.DISKLESS_REMOTE_STORAGE_CONSOLIDATION_ENABLE_CONFIG} requires ${RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP}=true")
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree it does not require allow-from-classic migration. Even though the topic type switcher work includes foundational work for TS consolidation (e.g. diskless start offset, etc) these are not guard by the feature flag.


require(logRollTimeMillis >= 1, "log.roll.ms must be greater than or equal to 1")
require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be greater than or equal to 0")
require(logRetentionTimeMillis >= 1 || logRetentionTimeMillis == -1, "log.retention.ms must be unlimited (-1) or, greater than or equal to 1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public class DisklessAndRemoteStorageConfigsTest {

private static final String ENABLE_DISKLESS_ERROR = "It is invalid to enable diskless on an already existing topic.";
private static final String DISABLE_DISKLESS_ERROR = "It is invalid to disable diskless.";
private static final String DISKLESS_REMOTE_SET_ERROR = "It is not valid to set a value for both diskless.enable and remote.storage.enable.";
private static final String DISKLESS_REMOTE_SET_ERROR = "It is not valid to set a value for both diskless.enable and remote.storage.enable unless it's for diskless migration or consolidation.";
private static final String DISABLE_REMOTE_WITHOUT_DELETE_ERROR = "It is invalid to disable remote storage without deleting remote data. "
+ "If you want to keep the remote data and turn to read only, please set `remote.storage.enable=true,remote.log.copy.disable=true`. "
+ "If you want to disable remote storage and delete all remote data, please set `remote.storage.enable=false,remote.log.delete.on.disable=true`.";
Expand Down
30 changes: 25 additions & 5 deletions core/src/test/java/kafka/server/InklessConfigsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,14 @@ private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig, boolean di
}

private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig, boolean disklessStorageEnableConfig, boolean isDisklessAllowFromClassicEnabled) throws Exception {
return init(defaultDisklessEnableConfig, disklessStorageEnableConfig, isDisklessAllowFromClassicEnabled, isDisklessAllowFromClassicEnabled, true, false, List.of());
return init(defaultDisklessEnableConfig, disklessStorageEnableConfig, isDisklessAllowFromClassicEnabled, isDisklessAllowFromClassicEnabled, false, true, false, List.of());
}

private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig,
boolean disklessStorageEnableConfig,
boolean isDisklessAllowFromClassicEnabled,
boolean disklessManagedReplicasEnabled,
boolean disklessRemoteStorageConsolidationEnabled,
boolean remoteLogStorageSystemEnabled,
boolean classicRemoteStorageForceEnabled,
List<String> classicRemoteStorageForceExcludeTopicRegexes) throws Exception {
Expand All @@ -101,6 +102,7 @@ private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig,
.setConfigProp(ServerConfigs.DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG, String.valueOf(disklessStorageEnableConfig))
.setConfigProp(ServerConfigs.DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG, String.valueOf(isDisklessAllowFromClassicEnabled))
.setConfigProp(ServerConfigs.DISKLESS_MANAGED_REPLICAS_ENABLE_CONFIG, String.valueOf(disklessManagedReplicasEnabled))
.setConfigProp(ServerConfigs.DISKLESS_REMOTE_STORAGE_CONSOLIDATION_ENABLE_CONFIG, String.valueOf(disklessRemoteStorageConsolidationEnabled))
.setConfigProp(ServerConfigs.CLASSIC_REMOTE_STORAGE_FORCE_ENABLE_CONFIG, String.valueOf(classicRemoteStorageForceEnabled))
.setConfigProp(ServerConfigs.CLASSIC_REMOTE_STORAGE_FORCE_EXCLUDE_TOPIC_REGEXES_CONFIG,
String.join(",", classicRemoteStorageForceExcludeTopicRegexes))
Expand Down Expand Up @@ -230,7 +232,7 @@ final class ClassicRemoteStorageForcePolicy {
private static final List<String> EXCLUDED_TOPIC_REGEXES = List.of("_schemas", "mm2-(.*)");

private KafkaClusterTestKit initWithClassicRemoteStorageForceEnabled() throws Exception {
return init(false, true, false, false, true, true, EXCLUDED_TOPIC_REGEXES);
return init(false, true, false, false, false, true, true, EXCLUDED_TOPIC_REGEXES);
}

@Test
Expand Down Expand Up @@ -403,7 +405,7 @@ final class FeatureFlagDependencyValidation {
@Test
void managedReplicasRequiresDisklessStorageSystem() {
final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class,
() -> init(false, false, false, true, true, false, List.of()));
() -> init(false, false, false, true, false, true, false, List.of()));
assertEquals(
"requirement failed: diskless.managed.rf.enable requires diskless.storage.system.enable=true",
exception.getMessage());
Expand All @@ -412,7 +414,7 @@ void managedReplicasRequiresDisklessStorageSystem() {
@Test
void allowFromClassicRequiresManagedReplicas() {
final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class,
() -> init(false, true, true, false, true, false, List.of()));
() -> init(false, true, true, false, false, true, false, List.of()));
assertEquals(
"requirement failed: diskless.allow.from.classic.enable requires diskless.managed.rf.enable=true",
exception.getMessage());
Expand All @@ -421,12 +423,30 @@ void allowFromClassicRequiresManagedReplicas() {
@Test
void allowFromClassicRequiresRemoteLogStorage() {
final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class,
() -> init(false, true, true, true, false, false, List.of()));
() -> init(false, true, true, true, false, false, false, List.of()));
assertEquals(
"requirement failed: diskless.allow.from.classic.enable requires remote.log.storage.system.enable=true",
exception.getMessage());
}

@Test
void disklessRemoteStorageConsolidationRequiresManagedReplicas() {
final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class,
() -> init(false, true, false, false, true, true, false, List.of()));
assertEquals(
"requirement failed: diskless.remote.storage.consolidation.enable requires diskless.managed.rf.enable=true",
exception.getMessage());
}

@Test
void disklessRemoteStorageConsolidationRequiresRemoteLogStorage() {
final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class,
() -> init(false, true, false, true, true, false, false, List.of()));
assertEquals(
"requirement failed: diskless.remote.storage.consolidation.enable requires remote.log.storage.system.enable=true",
exception.getMessage());
}

@Test
void fullDependencyChainSucceeds() throws Exception {
final KafkaClusterTestKit cluster = init(false, true, true);
Expand Down
Loading
Loading