Skip to content

Commit 1454a5a

Browse files
author
Andrei Nadyktov
committed
IGNITE-22530 Make CdcConsumerEx accept list of actual caches names
1 parent 9241f9c commit 1454a5a

File tree

3 files changed

+23
-6
lines changed

3 files changed

+23
-6
lines changed

modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcConsumerEx.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,21 @@
1818
package org.apache.ignite.internal.cdc;
1919

2020
import java.nio.file.Path;
21+
import java.util.List;
2122

2223
import org.apache.ignite.cdc.CdcConsumer;
2324
import org.apache.ignite.metric.MetricRegistry;
2425

2526
/**
26-
* Extended CdcConsumer interface which provides overloaded {@link CdcConsumerEx#start(MetricRegistry, Path)} method
27+
* Extended CdcConsumer interface which provides overloaded {@link CdcConsumerEx#start(MetricRegistry, Path, List)} method
2728
* required for CDC regex filters.
2829
*/
2930
public interface CdcConsumerEx extends CdcConsumer {
3031
/**
3132
* Starts the consumer.
3233
* @param mreg Metric registry for consumer specific metrics.
3334
* @param cdcDir Path to Change Data Capture Directory.
35+
* @param cacheNames List of cache names.
3436
*/
35-
void start(MetricRegistry mreg, Path cdcDir);
37+
void start(MetricRegistry mreg, Path cdcDir, List<String> cacheNames);
3638
}

modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,13 @@
2626
import java.util.Comparator;
2727
import java.util.HashSet;
2828
import java.util.Iterator;
29+
import java.util.List;
2930
import java.util.Map;
3031
import java.util.Objects;
3132
import java.util.Set;
3233
import java.util.concurrent.atomic.AtomicLong;
3334
import java.util.function.Predicate;
35+
import java.util.stream.Collectors;
3436
import java.util.stream.Stream;
3537
import org.apache.ignite.IgniteCheckedException;
3638
import org.apache.ignite.IgniteException;
@@ -41,6 +43,7 @@
4143
import org.apache.ignite.cdc.CdcConsumer;
4244
import org.apache.ignite.cdc.CdcEvent;
4345
import org.apache.ignite.cdc.TypeMapping;
46+
import org.apache.ignite.configuration.CacheConfiguration;
4447
import org.apache.ignite.configuration.DataRegionConfiguration;
4548
import org.apache.ignite.configuration.DataStorageConfiguration;
4649
import org.apache.ignite.configuration.IgniteConfiguration;
@@ -339,7 +342,17 @@ public void runX() throws Exception {
339342
committedSegmentOffset.value(walState.get1().fileOffset());
340343
}
341344

342-
consumer.start(mreg, kctx.metric().registry(metricName("cdc", "consumer")), ft.walCdc().toPath());
345+
List<String> cacheNames = GridLocalConfigManager
346+
.readCachesData(
347+
ft,
348+
kctx.marshallerContext().jdkMarshaller(),
349+
igniteCfg)
350+
.values().stream()
351+
.map(data -> data.configuration().getName())
352+
.collect(Collectors.toList());
353+
354+
consumer.start(mreg, kctx.metric().registry(metricName("cdc", "consumer")), ft.walCdc().toPath(),
355+
cacheNames);
343356

344357
started = true;
345358

modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.nio.file.Path;
2121
import java.util.EnumSet;
2222
import java.util.Iterator;
23+
import java.util.List;
2324
import java.util.NoSuchElementException;
2425
import org.apache.ignite.IgniteCheckedException;
2526
import org.apache.ignite.IgniteException;
@@ -188,11 +189,12 @@ public void onCacheDestroyEvents(Iterator<Integer> caches) {
188189
* @param cdcReg CDC metric registry.
189190
* @param cdcConsumerReg CDC consumer metric registry.
190191
* @param cdcDir Path to Change Data Capture Directory.
192+
* @param cacheNames List of cache names.
191193
* @throws IgniteCheckedException If failed.
192194
*/
193-
public void start(MetricRegistryImpl cdcReg, MetricRegistryImpl cdcConsumerReg, Path cdcDir) throws IgniteCheckedException {
195+
public void start(MetricRegistryImpl cdcReg, MetricRegistryImpl cdcConsumerReg, Path cdcDir, List<String> cacheNames) throws IgniteCheckedException {
194196
if (consumer instanceof CdcConsumerEx)
195-
((CdcConsumerEx) consumer).start(cdcConsumerReg, cdcDir);
197+
((CdcConsumerEx) consumer).start(cdcConsumerReg, cdcDir, cacheNames);
196198
else
197199
consumer.start(cdcConsumerReg);
198200

@@ -205,7 +207,7 @@ public void start(MetricRegistryImpl cdcReg, MetricRegistryImpl cdcConsumerReg,
205207

206208
/**
207209
* Stops the consumer.
208-
* This methods can be invoked only after {@link #start(MetricRegistryImpl, MetricRegistryImpl, Path)}.
210+
* This methods can be invoked only after {@link #start(MetricRegistryImpl, MetricRegistryImpl, Path, List)}.
209211
*/
210212
public void stop() {
211213
consumer.stop();

0 commit comments

Comments
 (0)