Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 109 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ Kstreamplify adds extra features to Kafka Streams, simplifying development so yo
* [By Key](#by-key)
* [By Key and Value](#by-key-and-value)
* [By Predicate](#by-predicate)
* [By Headers](#by-headers)
* [OpenTelemetry](#opentelemetry)
* [Custom Tags for Metrics](#custom-tags-for-metrics)
* [Swagger](#swagger)
Expand Down Expand Up @@ -806,26 +807,46 @@ For more details about prefixes, see the [Prefix](#prefix) section.

### Deduplication

The `DeduplicationUtils` class helps you deduplicate streams based on various criteria and within a specified time window.
`DeduplicationUtils` deduplicates streams based on various criteria within a specified time window.
- Methods with `withErrors` return a `KStream<String, ProcessingResult<V, V2>`, allowing you to handle errors and route them to `TopologyErrorHandler#catchErrors()`.
- Methods without `withErrors` return a plain `KStream<String, V>` and can be used directly.

All deduplication methods return a `KStream<String, ProcessingResult<V,V2>`, allowing you to handle errors and route them to `TopologyErrorHandler#catchErrors()`.

**Note**: Only streams with `String` keys and Avro values are supported.
Only streams with `String` keys and Avro values are supported.

#### By Key

```java
@Component
public class MyKafkaStreams extends KafkaStreamsStarter {
@Override
public void topology(StreamsBuilder streamsBuilder) {
KStream<String, KafkaUser> myStream = streamsBuilder
@Override
public void topology(StreamsBuilder streamsBuilder) {
KStream<String, KafkaUser> myStream = streamsBuilder
.stream("input_topic");

DeduplicationUtils
.deduplicateByKeyWithErrors(streamsBuilder, myStream, Duration.ofDays(60))
.to("output_topic", Produced.with(Serdes.String(), SerdesUtils.getValueSerdes()));
}
}
```

DeduplicationUtils
.deduplicateKeys(streamsBuilder, myStream, Duration.ofDays(60))
.to("output_topic");
}
Or, using the `ProcessingResult` API:

```java
@Component
public class MyKafkaStreams extends KafkaStreamsStarter {
@Override
public void topology(StreamsBuilder streamsBuilder) {
KStream<String, KafkaUser> myStream = streamsBuilder
.stream("input_topic");

TopologyErrorHandler
.catchErrors(
DeduplicationUtils
.deduplicateByKeyWithErrors(streamsBuilder, myStream, Duration.ofDays(60))
)
.to("output_topic", Produced.with(Serdes.String(), SerdesUtils.getValueSerdes()));
}
}
```

Expand All @@ -840,14 +861,37 @@ public class MyKafkaStreams extends KafkaStreamsStarter {
.stream("input_topic");

DeduplicationUtils
.deduplicateKeyValues(streamsBuilder, myStream, Duration.ofDays(60))
.deduplicateByKeyValue(streamsBuilder, myStream, Duration.ofDays(60))
.to("output_topic");
}
}
```

Or, using the `ProcessingResult` API:

```java
@Component
public class MyKafkaStreams extends KafkaStreamsStarter {
@Override
public void topology(StreamsBuilder streamsBuilder) {
KStream<String, KafkaUser> myStream = streamsBuilder
.stream("input_topic");

TopologyErrorHandler
.catchErrors(
DeduplicationUtils
.deduplicateByKeyValueWithErrors(streamsBuilder, myStream, Duration.ofDays(60))
)
.to("output_topic", Produced.with(Serdes.String(), SerdesUtils.getValueSerdes()));
}
}
```

#### By Predicate

The predicate is used as the key in the underlying window store that tracks seen records.
The stream is deduplicated based on the values derived from the predicate.

```java
@Component
public class MyKafkaStreams extends KafkaStreamsStarter {
Expand All @@ -857,20 +901,44 @@ public class MyKafkaStreams extends KafkaStreamsStarter {
.stream("input_topic");

DeduplicationUtils
.deduplicateWithPredicate(streamsBuilder, myStream, Duration.ofDays(60),
.deduplicateByPredicate(streamsBuilder, myStream, Duration.ofDays(60),
value -> value.getFirstName() + "#" + value.getLastName())
.to("output_topic");
}
}
```

In the predicate approach, the provided predicate is used as the key in the window store. The stream will be deduplicated based on the values derived from the predicate.
Or, using the `ProcessingResult` API:

```java
@Component
public class MyKafkaStreams extends KafkaStreamsStarter {
@Override
public void topology(StreamsBuilder streamsBuilder) {
KStream<String, KafkaUser> myStream = streamsBuilder
.stream("input_topic");

TopologyErrorHandler
.catchErrors(
DeduplicationUtils
.deduplicateByPredicateWithErrors(
streamsBuilder,
myStream,
Duration.ofDays(60),
value -> value.getFirstName() + "#" + value.getLastName()
)
)
.to("output_topic", Produced.with(Serdes.String(), SerdesUtils.getValueSerdes()));
}
}
```

#### By Headers

```java
import java.util.List;
The list of headers is used to build a composite deduplication key.
The stream is deduplicated based on the extracted header values.

```java
@Component
public class MyKafkaStreams extends KafkaStreamsStarter {
@Override
Expand All @@ -879,14 +947,37 @@ public class MyKafkaStreams extends KafkaStreamsStarter {
.stream("input_topic");

DeduplicationUtils
.deduplicateWithHeaders(streamsBuilder, myStream, Duration.ofDays(60),
.deduplicateByHeaders(streamsBuilder, myStream, Duration.ofDays(60),
List.of("header1", "header2"))
.to("output_topic");
}
}
```

The provided list of headers is used to build a composite deduplication key. The stream is deduplicated based on the extracted header values.
Or, using the `ProcessingResult` API:

```java
@Component
public class MyKafkaStreams extends KafkaStreamsStarter {
@Override
public void topology(StreamsBuilder streamsBuilder) {
KStream<String, KafkaUser> myStream = streamsBuilder
.stream("input_topic");

TopologyErrorHandler
.catchErrors(
DeduplicationUtils
.deduplicateByHeadersWithErrors(
streamsBuilder,
myStream,
Duration.ofDays(60),
List.of("header1", "header2")
)
)
.to("output_topic", Produced.with(Serdes.String(), SerdesUtils.getValueSerdes()));
}
}
```

## OpenTelemetry

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package com.michelin.kstreamplify.deduplication;

import com.michelin.kstreamplify.error.ProcessingResult;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
Expand All @@ -28,79 +27,91 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;

/**
* Transformer class for the deduplication mechanism on headers of a given topic.
* Processor class for the deduplication mechanism on headers of a given topic.
*
* @param <V> The type of the value
*/
public class DedupHeadersProcessor<V extends SpecificRecord>
implements Processor<String, V, String, ProcessingResult<V, V>> {

/** Window store name, initialized @ construction. */
public class DedupHeadersProcessor<V extends SpecificRecord> implements Processor<String, V, String, V> {
private final String windowStoreName;
/** Retention window for the state store. Used for fetching data. */
private final Duration retentionWindowDuration;
/** Deduplication headers list. */
private final List<String> deduplicationHeadersList;
/** Kstream context for this transformer. */
private ProcessorContext<String, ProcessingResult<V, V>> processorContext;
/** Window store containing all the records seen on the given window. */
private final List<String> deduplicationHeaders;
private ProcessorContext<String, V> processorContext;
private WindowStore<String, String> dedupWindowStore;

/**
* Constructor.
*
* @param windowStoreName Name of the deduplication state store
* @param retentionWindowDuration Retention window duration
* @param deduplicationHeadersList Deduplication headers list
* @param deduplicationHeaders Deduplication headers list
*/
public DedupHeadersProcessor(
String windowStoreName, Duration retentionWindowDuration, List<String> deduplicationHeadersList) {
String windowStoreName, Duration retentionWindowDuration, List<String> deduplicationHeaders) {
this.windowStoreName = windowStoreName;
this.retentionWindowDuration = retentionWindowDuration;
this.deduplicationHeadersList = deduplicationHeadersList;
this.deduplicationHeaders = deduplicationHeaders;
}

/**
* Get the header value for a given key
*
* @param headers headers of the record
* @param key the key to look for in the headers
* @return The header value for the given key, or an empty string if the header is not present or has no value.
*/
private static String getHeader(Headers headers, String key) {
Header header = headers.lastHeader(key);
if (header == null || header.value() == null) {
return StringUtils.EMPTY;
}
String value = new String(header.value(), StandardCharsets.UTF_8);
return StringUtils.defaultString(value);
}

/**
* Initialize the processor.
*
* @param context the processor context
*/
@Override
public void init(ProcessorContext<String, ProcessingResult<V, V>> context) {
this.processorContext = context;
dedupWindowStore = this.processorContext.getStateStore(windowStoreName);
public void init(ProcessorContext<String, V> context) {
processorContext = context;
dedupWindowStore = processorContext.getStateStore(windowStoreName);
}

/**
* Process a record.
*
* @param message the record to process
*/
@Override
public void process(Record<String, V> message) {
try {
// Get the record timestamp
var currentInstant = Instant.ofEpochMilli(message.timestamp());
String identifier = buildIdentifier(message.headers());
Instant currentInstant = Instant.ofEpochMilli(message.timestamp());
String identifier = buildIdentifier(message.headers());

// Retrieve all the matching keys in the stateStore and return null if found it (signaling a duplicate)
try (var resultIterator = dedupWindowStore.backwardFetch(
identifier,
currentInstant.minus(retentionWindowDuration),
currentInstant.plus(retentionWindowDuration))) {
while (resultIterator != null && resultIterator.hasNext()) {
var currentKeyValue = resultIterator.next();
if (identifier.equals(currentKeyValue.value)) {
return;
}
// Retrieve all the matching keys in the state store and return null if found it (signaling a duplicate)
try (WindowStoreIterator<String> resultIterator = dedupWindowStore.backwardFetch(
identifier,
currentInstant.minus(retentionWindowDuration),
currentInstant.plus(retentionWindowDuration))) {
while (resultIterator != null && resultIterator.hasNext()) {
KeyValue<Long, String> currentKeyValue = resultIterator.next();
if (identifier.equals(currentKeyValue.value)) {
return;
}
}
// First time we see this record, store entry in the window store and forward the record to the output
dedupWindowStore.put(identifier, identifier, message.timestamp());
processorContext.forward(ProcessingResult.wrapRecordSuccess(message));
} catch (Exception e) {
processorContext.forward(ProcessingResult.wrapRecordFailure(
e,
message,
"Could not figure out what to do with the current payload: "
+ "An unlikely error occurred during deduplication transform"));
}
// First time we see this record, store entry in the window store and forward the record to the output
dedupWindowStore.put(identifier, identifier, message.timestamp());
processorContext.forward(message);
}

/**
Expand All @@ -110,24 +121,6 @@ public void process(Record<String, V> message) {
* @return The built identifier
*/
private String buildIdentifier(Headers headers) {
return deduplicationHeadersList.stream()
.map(key -> getHeader(headers, key))
.collect(Collectors.joining("#"));
}

/**
* Get the header value for a given key
*
* @param headers headers of the record
* @param key the key to look for in the headers
* @return The header value for the given key, or an empty string if the header is not present or has no value.
*/
private static String getHeader(Headers headers, String key) {
Header header = headers.lastHeader(key);
if (header == null || header.value() == null) {
return StringUtils.EMPTY;
}
String value = new String(header.value(), StandardCharsets.UTF_8);
return StringUtils.defaultString(value);
return deduplicationHeaders.stream().map(key -> getHeader(headers, key)).collect(Collectors.joining("#"));
}
}
Loading
Loading