Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,14 @@ public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
long getProximaIOWriteFinalizeTimeoutMs();

void setProximaIOWriteFinalizeTimeoutMs(long timeout);

@Default.Integer(1000)
int getProximaIOMaxPendingWrites();

void setProximaIOMaxPendingWrites(int value);

@Default.Integer(50)
int getProximaIOTransactionWriteWeight();

void setProximaIOTransactionWriteWeight(int value);
}
58 changes: 44 additions & 14 deletions beam/core/src/main/java/cz/o2/proxima/beam/io/ProximaIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import cz.o2.proxima.beam.core.ProximaPipelineOptions;
import cz.o2.proxima.core.annotations.Experimental;
import cz.o2.proxima.core.repository.RepositoryFactory;
import cz.o2.proxima.core.repository.TransactionMode;
import cz.o2.proxima.core.storage.StreamElement;
import cz.o2.proxima.core.util.ExceptionUtils;
import cz.o2.proxima.core.util.Pair;
Expand Down Expand Up @@ -75,13 +76,15 @@ private Write(RepositoryFactory repositoryFactory) {

@Override
public PDone expand(PCollection<StreamElement> input) {
long bundleFinalizeTimeoutMs =
input
.getPipeline()
.getOptions()
.as(ProximaPipelineOptions.class)
.getProximaIOWriteFinalizeTimeoutMs();
input.apply("Write", ParDo.of(new WriteFn(bundleFinalizeTimeoutMs, repositoryFactory)));
ProximaPipelineOptions proximaOpts =
input.getPipeline().getOptions().as(ProximaPipelineOptions.class);
long bundleFinalizeTimeoutMs = proximaOpts.getProximaIOWriteFinalizeTimeoutMs();
int maxPendingWrites = proximaOpts.getProximaIOMaxPendingWrites();
int weight = proximaOpts.getProximaIOTransactionWriteWeight();
input.apply(
"Write",
ParDo.of(
new WriteFn(bundleFinalizeTimeoutMs, maxPendingWrites, weight, repositoryFactory)));
return PDone.in(input.getPipeline());
}
}
Expand All @@ -90,15 +93,25 @@ static class WriteFn extends DoFn<StreamElement, Void> {

private final RepositoryFactory repositoryFactory;
private final long bundleFinalizeTimeoutMs;
private final int maxPendingWrites;
private final int transactionalWriteWeight;

private transient DirectDataOperator direct;

private transient Set<CompletableFuture<Pair<Boolean, Throwable>>> pendingWrites;
private transient AtomicInteger missingResponses;
private transient AtomicInteger inflightWriteWeights;

WriteFn(
long bundleFinalizeTimeoutMs,
int maxPendingWrites,
int transactionalWriteWeight,
RepositoryFactory repositoryFactory) {

WriteFn(long bundleFinalizeTimeoutMs, RepositoryFactory repositoryFactory) {
this.bundleFinalizeTimeoutMs = bundleFinalizeTimeoutMs;
this.repositoryFactory = repositoryFactory;
this.maxPendingWrites = maxPendingWrites;
this.transactionalWriteWeight = transactionalWriteWeight;
}

@VisibleForTesting
Expand All @@ -116,6 +129,7 @@ public void startBundle() {
// we access the collection asynchronously on completion of writes
pendingWrites = Collections.synchronizedSet(new HashSet<>());
missingResponses = new AtomicInteger();
inflightWriteWeights = new AtomicInteger();
}

@FinishBundle
Expand Down Expand Up @@ -160,22 +174,38 @@ public void processElement(@Element StreamElement element) {
AtomicReference<Runnable> writeRunnableRef = new AtomicReference<>();
// increment missing responses outside the retry runnable
missingResponses.incrementAndGet();
boolean isTransactional =
element.getAttributeDescriptor().getTransactionMode() != TransactionMode.NONE;
int weight = isTransactional ? transactionalWriteWeight : 1;
synchronized (pendingWrites) {
while (inflightWriteWeights.get() >= maxPendingWrites) {
ExceptionUtils.unchecked(() -> pendingWrites.wait(100));
}
inflightWriteWeights.addAndGet(weight);
}
writeRunnableRef.set(
() -> {
CompletableFuture<Pair<Boolean, Throwable>> writeResult = new CompletableFuture<>();
writeResult.thenAccept(
r -> {
if (Boolean.TRUE.equals(r.getFirst())) {
// remove successfully completed write
missingResponses.decrementAndGet();
pendingWrites.remove(writeResult);
} else if (r.getSecond() instanceof TransactionRejectedException) {
if (r.getSecond() instanceof TransactionRejectedException) {
// restart the writing transaction
writeRunnableRef.get().run();
// transaction rejected, restart transaction
pendingWrites.remove(writeResult);
} else {
// this is no longer inflight
inflightWriteWeights.addAndGet(-weight);
synchronized (pendingWrites) {
pendingWrites.notify();
if (Boolean.TRUE.equals(r.getFirst())) {
// remove successfully completed write
missingResponses.decrementAndGet();
pendingWrites.remove(writeResult);
}
}
// else keep the failed future until finish bundle
}
// else keep the failed future until finish bundle
});
pendingWrites.add(writeResult);
writer.write(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class ProximaIOWriteFnTest {
Repository.ofTest(ConfigFactory.load("test-reference.conf").resolve());
private final EntityDescriptor gateway = repository.getEntity("gateway");
private final AttributeDescriptor<byte[]> status = gateway.getAttribute("status");
private final WriteFn writeFn = new WriteFn(1000L, repository.asFactory());
private final WriteFn writeFn = new WriteFn(1000L, 100, 50, repository.asFactory());
private RandomAccessReader reader;

@Before
Expand Down Expand Up @@ -88,7 +88,7 @@ public void testTransactionRejection() {
AtomicInteger written = new AtomicInteger();
OnlineAttributeWriter mockWriter = createSerializableWriter(fails, written);
WriteFn modifiedWriteFn =
new WriteFn(30000L, repository.asFactory()) {
new WriteFn(30000L, 100, 50, repository.asFactory()) {
@Override
OnlineAttributeWriter getWriterForElement(StreamElement element) {
return mockWriter;
Expand Down
Loading