Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7095,8 +7095,12 @@ class ReplicaManagerTest {
val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_)), new LogConfig(new Properties()))
val sharedState = mock(classOf[SharedState], Answers.RETURNS_DEEP_STUBS)
when(sharedState.time()).thenReturn(Time.SYSTEM)
when(sharedState.config()).thenReturn(new InklessConfig(new util.HashMap[String, Object]()))
val inklessConfigMap = new util.HashMap[String, Object]()
// Disable lagging consumer feature to match the empty lagging fetch storage
inklessConfigMap.put("fetch.lagging.consumer.thread.pool.size", Integer.valueOf(0))
when(sharedState.config()).thenReturn(new InklessConfig(inklessConfigMap))
when(sharedState.controlPlane()).thenReturn(controlPlane.getOrElse(mock(classOf[ControlPlane])))
when(sharedState.maybeLaggingFetchStorage()).thenReturn(Optional.empty())
val inklessMetadata = mock(classOf[InklessMetadataView])
when(inklessMetadata.isDisklessTopic(any())).thenReturn(false)
when(inklessMetadata.getTopicId(anyString())).thenAnswer{ invocation =>
Expand Down
138 changes: 108 additions & 30 deletions storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.storage.internals.log.LogConfig;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;

import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;

import io.aiven.inkless.cache.BatchCoordinateCache;
Expand All @@ -52,6 +54,16 @@ public final class SharedState implements Closeable {
private final InklessConfig config;
private final MetadataView metadata;
private final ControlPlane controlPlane;
private final StorageBackend fetchStorage;
// Separate storage client for lagging consumers to:
// 1. Isolate connection pool usage (lagging consumers shouldn't exhaust connections for hot path)
// 2. Allow independent tuning of timeouts/retries for cold storage access patterns
private final Optional<StorageBackend> maybeLaggingFetchStorage;
private final StorageBackend produceStorage;
// backgroundStorage is shared by FileCleaner and FileMerger executors which run concurrently.
// Kafka storage backends are required to be thread-safe (they share the same Metrics instance).
// A dedicated backend instance guarantees they don't contend with hot-path fetch/produce clients.
private final StorageBackend backgroundStorage;
private final ObjectKeyCreator objectKeyCreator;
private final KeyAlignmentStrategy keyAlignmentStrategy;
private final ObjectCache cache;
Expand All @@ -60,12 +72,17 @@ public final class SharedState implements Closeable {
private final Supplier<LogConfig> defaultTopicConfigs;
private final Metrics storageMetrics;

public SharedState(
private SharedState(
final Time time,
final int brokerId,
final InklessConfig config,
final MetadataView metadata,
final ControlPlane controlPlane,
final StorageBackend fetchStorage,
final Optional<StorageBackend> maybeLaggingFetchStorage,
final StorageBackend produceStorage,
final StorageBackend backgroundStorage,
final Metrics storageMetrics,
final ObjectKeyCreator objectKeyCreator,
final KeyAlignmentStrategy keyAlignmentStrategy,
final ObjectCache cache,
Expand All @@ -78,18 +95,17 @@ public SharedState(
this.config = config;
this.metadata = metadata;
this.controlPlane = controlPlane;
this.fetchStorage = fetchStorage;
this.maybeLaggingFetchStorage = maybeLaggingFetchStorage;
this.produceStorage = produceStorage;
this.backgroundStorage = backgroundStorage;
this.storageMetrics = storageMetrics;
this.objectKeyCreator = objectKeyCreator;
this.keyAlignmentStrategy = keyAlignmentStrategy;
this.cache = cache;
this.batchCoordinateCache = batchCoordinateCache;
this.brokerTopicStats = brokerTopicStats;
this.defaultTopicConfigs = defaultTopicConfigs;

final MetricsReporter reporter = new JmxReporter();
this.storageMetrics = new Metrics(
new MetricConfig(), List.of(reporter), Time.SYSTEM,
new KafkaMetricsContext(STORAGE_METRIC_CONTEXT)
);
}

public static SharedState initialize(
Expand All @@ -107,35 +123,79 @@ public static SharedState initialize(
"Value of consume.batch.coordinate.cache.ttl.ms exceeds file.cleaner.retention.period.ms / 2"
);
}
return new SharedState(
time,
brokerId,
config,
metadata,
controlPlane,
ObjectKey.creator(config.objectKeyPrefix(), config.objectKeyLogPrefixMasked()),
new FixedBlockAlignment(config.fetchCacheBlockBytes()),
new CaffeineCache(

CaffeineCache objectCache = null;
BatchCoordinateCache batchCoordinateCache = null;
StorageBackend fetchStorage = null;
StorageBackend laggingFetchStorage = null;
StorageBackend produceStorage = null;
StorageBackend backgroundStorage = null;
Metrics storageMetrics = null;
try {
objectCache = new CaffeineCache(
config.cacheMaxCount(),
config.cacheMaxBytes(),
config.cacheExpirationLifespanSec(),
config.cacheExpirationMaxIdleSec()
),
config.isBatchCoordinateCacheEnabled() ? new CaffeineBatchCoordinateCache(config.batchCoordinateCacheTtl()) : new NullBatchCoordinateCache(),
brokerTopicStats,
defaultTopicConfigs
);
);
batchCoordinateCache = config.isBatchCoordinateCacheEnabled()
? new CaffeineBatchCoordinateCache(config.batchCoordinateCacheTtl())
: new NullBatchCoordinateCache();

final MetricsReporter reporter = new JmxReporter();
storageMetrics = new Metrics(
new MetricConfig(), List.of(reporter), Time.SYSTEM,
new KafkaMetricsContext(STORAGE_METRIC_CONTEXT)
);
fetchStorage = config.storage(storageMetrics);
// If thread pool size is 0, lagging consumer support is disabled — don't create a separate client.
// Enabling lagging consumer support requires a broker restart so that a new storage client can be created.
laggingFetchStorage = config.fetchLaggingConsumerThreadPoolSize() > 0 ? config.storage(storageMetrics) : null;
produceStorage = config.storage(storageMetrics);
backgroundStorage = config.storage(storageMetrics);
final var objectKeyCreator = ObjectKey.creator(config.objectKeyPrefix(), config.objectKeyLogPrefixMasked());
final var keyAlignmentStrategy = new FixedBlockAlignment(config.fetchCacheBlockBytes());
return new SharedState(
time,
brokerId,
config,
metadata,
controlPlane,
fetchStorage,
Optional.ofNullable(laggingFetchStorage),
produceStorage,
backgroundStorage,
storageMetrics,
objectKeyCreator,
keyAlignmentStrategy,
objectCache,
batchCoordinateCache,
brokerTopicStats,
defaultTopicConfigs
);
} catch (Exception e) {
Utils.closeQuietly(backgroundStorage, "backgroundStorage");
Utils.closeQuietly(produceStorage, "produceStorage");
Utils.closeQuietly(laggingFetchStorage, "laggingFetchStorage");
Utils.closeQuietly(fetchStorage, "fetchStorage");
Utils.closeQuietly(storageMetrics, "storageMetrics");
Utils.closeQuietly(batchCoordinateCache, "batchCoordinateCache");
Utils.closeQuietly(objectCache, "objectCache");
Utils.closeQuietly(controlPlane, "controlPlane");
throw new RuntimeException("Failed to initialize SharedState", e);
}
}

@Override
public void close() throws IOException {
try {
cache.close();
controlPlane.close();
storageMetrics.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
Utils.closeQuietly(backgroundStorage, "backgroundStorage");
Utils.closeQuietly(produceStorage, "produceStorage");
maybeLaggingFetchStorage.ifPresent(s -> Utils.closeQuietly(s, "laggingFetchStorage"));
Utils.closeQuietly(fetchStorage, "fetchStorage");
Utils.closeQuietly(storageMetrics, "storageMetrics");
Utils.closeQuietly(batchCoordinateCache, "batchCoordinateCache");
Utils.closeQuietly(cache, "objectCache");
Utils.closeQuietly(controlPlane, "controlPlane");
}

public Time time() {
Expand Down Expand Up @@ -186,7 +246,25 @@ public Supplier<LogConfig> defaultTopicConfigs() {
return defaultTopicConfigs;
}

public StorageBackend buildStorage() {
return config.storage(storageMetrics);
public StorageBackend fetchStorage() {
return fetchStorage;
}

/**
* Optional access to the lagging fetch storage backend.
*
* <p>When {@code fetch.lagging.consumer.thread.pool.size == 0}, the lagging consumer
* path is disabled and this storage backend is not created.</p>
*/
public Optional<StorageBackend> maybeLaggingFetchStorage() {
return maybeLaggingFetchStorage;
}

public StorageBackend produceStorage() {
return produceStorage;
}

public StorageBackend backgroundStorage() {
return backgroundStorage;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,23 +51,11 @@ public FetchHandler(final SharedState state) {
state.keyAlignmentStrategy(),
state.cache(),
state.controlPlane(),
state.buildStorage(),
state.fetchStorage(),
state.brokerTopicStats(),
state.config().fetchMetadataThreadPoolSize(),
state.config().fetchDataThreadPoolSize(),
// Separate storage client for lagging consumers to:
// 1. Isolate connection pool usage (lagging consumers shouldn't exhaust connections for hot path)
// 2. Allow independent tuning of timeouts/retries for cold storage access patterns
// (This requires some refactoring on how the storage client is built/configured)
// If thread pool size is 0, disabling lagging consumer support, don't create a separate client
//
// NOTE: The client for lagging consumers is created only when this FetchHandler (and Reader)
// is constructed. If fetchLaggingConsumerThreadPoolSize() is 0 at this time, no separate
// client is created and lagging consumer support is effectively disabled for the lifetime
// of this instance, even if the configuration is later reloaded with a non-zero value.
// Enabling lagging consumer support therefore requires a broker restart (or reconstruction
// of the SharedState/FetchHandler) so that a new storage client can be created.
state.config().fetchLaggingConsumerThreadPoolSize() > 0 ? state.buildStorage() : null,
state.maybeLaggingFetchStorage(),
state.config().fetchLaggingConsumerThresholdMs(),
state.config().fetchLaggingConsumerRequestRateLimit(),
state.config().fetchLaggingConsumerThreadPoolSize(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand All @@ -51,6 +52,7 @@
import io.aiven.inkless.control_plane.ControlPlane;
import io.aiven.inkless.generated.FileExtent;
import io.aiven.inkless.storage_backend.common.ObjectFetcher;
import io.aiven.inkless.storage_backend.common.StorageBackend;
import io.github.bucket4j.Bandwidth;
import io.github.bucket4j.Bucket;

Expand Down Expand Up @@ -119,7 +121,7 @@ public Reader(
BrokerTopicStats brokerTopicStats,
int fetchMetadataThreadPoolSize,
int fetchDataThreadPoolSize,
ObjectFetcher laggingObjectFetcher,
Optional<StorageBackend> maybeLaggingObjectFetcher,
long laggingConsumerThresholdMs,
int laggingConsumerRequestRateLimit,
int laggingConsumerThreadPoolSize,
Expand All @@ -135,9 +137,7 @@ public Reader(
maxBatchesPerPartitionToFind,
Executors.newFixedThreadPool(fetchMetadataThreadPoolSize, new InklessThreadFactory("inkless-fetch-metadata-", false)),
Executors.newFixedThreadPool(fetchDataThreadPoolSize, new InklessThreadFactory("inkless-fetch-data-", false)),
// Only create lagging consumer fetcher when feature is enabled (pool size > 0).
// A pool size of 0 is a valid configuration that explicitly disables the feature (null fetcher and executor).
laggingConsumerThreadPoolSize > 0 ? laggingObjectFetcher : null,
maybeLaggingObjectFetcher.orElse(null),
laggingConsumerThresholdMs,
laggingConsumerRequestRateLimit,
// Only create lagging consumer resources when feature is enabled (pool size > 0).
Expand Down Expand Up @@ -435,8 +435,7 @@ public void close() throws IOException {
if (metadataThreadPoolMonitor != null) metadataThreadPoolMonitor.close();
if (dataThreadPoolMonitor != null) dataThreadPoolMonitor.close();
if (laggingConsumerThreadPoolMonitor != null) laggingConsumerThreadPoolMonitor.close();
objectFetcher.close();
if (laggingConsumerObjectFetcher != null) laggingConsumerObjectFetcher.close();
// SharedState owns the storage backend lifecycle; only close thread pools and metrics here.
fetchMetrics.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public FileCleaner(SharedState sharedState) {
this(
sharedState.time(),
sharedState.controlPlane(),
sharedState.buildStorage(),
sharedState.backgroundStorage(),
sharedState.objectKeyCreator(),
sharedState.config().fileCleanerRetentionPeriod()
);
Expand Down Expand Up @@ -143,7 +143,7 @@ private void cleanFiles(Set<String> objectKeyPaths) throws StorageBackendExcepti

@Override
public void close() throws IOException {
storage.close();
// SharedState owns the storage backend lifecycle; only close component metrics here.
metrics.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public FileMerger(final SharedState sharedState) {
this.time = sharedState.time();
this.config = sharedState.config();
this.controlPlane = sharedState.controlPlane();
this.storage = sharedState.buildStorage();
this.storage = sharedState.backgroundStorage();
this.objectKeyCreator = sharedState.objectKeyCreator();
this.metrics = new FileMergerMetrics();

Expand Down Expand Up @@ -239,7 +239,7 @@ private void tryDeleteFile(ObjectKey objectKey, Exception e) {

@Override
public void close() throws IOException {
storage.close();
// SharedState owns the storage backend lifecycle; only close component metrics here.
metrics.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public AppendHandler(final SharedState state) {
state.time(),
state.brokerId(),
state.objectKeyCreator(),
state.buildStorage(),
state.produceStorage(),
state.keyAlignmentStrategy(),
state.cache(),
state.batchCoordinateCache(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ class Writer implements Closeable {

private final Lock lock = new ReentrantLock();
private ActiveFile activeFile;
private final StorageBackend storage;
private final FileCommitter fileCommitter;
private final Time time;
private final Duration commitInterval;
Expand Down Expand Up @@ -112,7 +111,6 @@ class Writer implements Closeable {
commitInterval,
maxBufferSize,
Executors.newScheduledThreadPool(1, new InklessThreadFactory("inkless-file-commit-ticker-", true)),
storage,
new FileCommitter(
brokerId, controlPlane, objectKeyCreator, storage,
keyAlignmentStrategy, objectCache, batchCoordinateCache, time,
Expand All @@ -128,7 +126,6 @@ class Writer implements Closeable {
final Duration commitInterval,
final int maxBufferSize,
final ScheduledExecutorService commitTickScheduler,
final StorageBackend storage,
final FileCommitter fileCommitter,
final WriterMetrics writerMetrics,
final BrokerTopicStats brokerTopicStats) {
Expand All @@ -139,7 +136,6 @@ class Writer implements Closeable {
}
this.maxBufferSize = maxBufferSize;
this.commitTickScheduler = Objects.requireNonNull(commitTickScheduler, "commitTickScheduler cannot be null");
this.storage = Objects.requireNonNull(storage, "storage cannot be null");
this.fileCommitter = Objects.requireNonNull(fileCommitter, "fileCommitter cannot be null");
this.writerMetrics = Objects.requireNonNull(writerMetrics, "writerMetrics cannot be null");
this.brokerTopicStats = brokerTopicStats;
Expand Down Expand Up @@ -236,7 +232,7 @@ public void close() throws IOException {
// Rotate file before closing the uploader so the file gets into the queue first.
rotateFile(true);
fileCommitter.close();
storage.close();
// SharedState owns the storage backend lifecycle.
writerMetrics.close();
} finally {
lock.unlock();
Expand Down
Loading
Loading