Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,21 @@
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.GenericManifestFile;
import org.apache.iceberg.GenericPartitionFieldSummary;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.InternalData;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionStatisticsFile;
import org.apache.iceberg.RewriteTablePathUtil;
import org.apache.iceberg.RewriteTablePathUtil.RewriteResult;
Expand All @@ -37,6 +48,7 @@
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.actions.ImmutableRewriteTablePath;
import org.apache.iceberg.actions.RewriteTablePath;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.util.Pair;

/**
Expand All @@ -57,11 +69,15 @@ public class RewriteTablePathOzoneAction implements RewriteTablePath {
private String stagingDir;
private int parallelism;

private ExecutorService executorService;
private static final int MAX_INFLIGHT_MULTIPLIER = 4;
private static final int DEFAULT_THREAD_COUNT = 10;

private final Table table;

public RewriteTablePathOzoneAction(Table table) {
this.table = table;
this.parallelism = Runtime.getRuntime().availableProcessors();
this.parallelism = DEFAULT_THREAD_COUNT;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thread count can be passed via command, can be done in subsequent PR.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes , we can pass the thread count via command during which
public RewriteTablePathOzoneAction(Table table, int parallelism) will be used , we can add this when the CLI command is added for the rewrite.

}

public RewriteTablePathOzoneAction(Table table, int parallelism) {
Expand Down Expand Up @@ -102,8 +118,7 @@ public RewriteTablePath stagingLocation(String stagingLocation) {
@Override
public Result execute() {
validateInputs();
// TODO: should use for parallel manifest and position delete file rewriting.
ExecutorService executorService = Executors.newFixedThreadPool(parallelism);
executorService = Executors.newFixedThreadPool(parallelism);
try {
return doExecute();
} finally {
Expand Down Expand Up @@ -197,6 +212,9 @@ private boolean versionInFilePath(String path, String version) {

private String rebuildMetadata() {
//TODO need to implement rewrite of manifest list , manifest files and position delete files.
TableMetadata startMetadata = startVersionName != null
? new StaticTableOperations(startVersionName, table.io()).current()
: null;
TableMetadata endMetadata = new StaticTableOperations(endVersionName, table.io()).current();

List<PartitionStatisticsFile> partitionStats = endMetadata.partitionStatisticsFiles();
Expand All @@ -205,6 +223,9 @@ private String rebuildMetadata() {
}

RewriteResult<Snapshot> rewriteVersionResult = rewriteVersionFiles(endMetadata);
Set<Snapshot> deltaSnapshots = deltaSnapshots(startMetadata, rewriteVersionResult.toRewrite());
//TODO: manifestsToRewrite will be used while re-write of manifest-list files.
Set<String> manifestsToRewrite = manifestsToRewrite(deltaSnapshots, startMetadata, endMetadata);

Set<Pair<String, String>> copyPlan = new HashSet<>();
copyPlan.addAll(rewriteVersionResult.copyPlan());
Expand Down Expand Up @@ -251,4 +272,109 @@ private Set<Pair<String, String>> rewriteVersionFile(TableMetadata metadata, Str

return result;
}

private Set<String> manifestsToRewrite(Set<Snapshot> deltaSnapshots, TableMetadata startMetadata,
TableMetadata endMetadata) {

final Set<Long> deltaSnapshotIds;
if (startMetadata != null) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The entire startMetadata object is passed but only .equals(null) is checked on it. The actual snapshot data is already captured in deltaSnapshots.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When startMetadata is not provided, deltaSnapshots will be equal to the full set of snapshots collected across all version files during the version file rewrite phase. When startMetadata is provided, deltaSnapshots will contain only those snapshots that are not tracked by the start version i.e. snapshots that appeared in intermediate version files between start and end, minus the snapshots already present in the start version's metadata.
Because deltaSnapshots is built by reading each intermediate version file's JSON as it was written at that point in time, it can include snapshots that were subsequently expired.

So we don't use deltaSnapshots for iterating and instead iterate through the snapshots collected from the endVersion metadata file because we won't be able to read the manifest list associated with the expired snapshots.
In manifestsToRewrite we use deltaSnapshots only to avoid including manifest files that were already rewritten in a previous incremental run. The snapshot_id field on each manifest entry identifies the snapshot that originally created it. By filtering to only those whose snapshot_id falls within deltaSnapshotIds, we select only manifests that are new since the start version and exclude those that were inherited from before it.

deltaSnapshotIds = deltaSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
} else {
deltaSnapshotIds = null;
}

Set<String> manifestPaths = ConcurrentHashMap.newKeySet();
int maxInFlight = parallelism * MAX_INFLIGHT_MULTIPLIER;
Semaphore semaphore = new Semaphore(maxInFlight);

ExecutorCompletionService<Void> completionService = new ExecutorCompletionService<>(executorService);

int submittedTasks = 0;
int completedTasks = 0;

try {
for (Snapshot snapshot : endMetadata.snapshots()) {
semaphore.acquire(); // blocks when maxInFlight tasks are already in-flight

final long snapshotId = snapshot.snapshotId();
final String manifestListLocation = snapshot.manifestListLocation();

boolean taskSubmitted = false;
try {
completionService.submit(() -> {
try (CloseableIterable<ManifestFile> manifests =
InternalData.read(
FileFormat.AVRO,
table.io().newInputFile(manifestListLocation))
.setRootType(GenericManifestFile.class)
.setCustomType(
ManifestFile.PARTITION_SUMMARIES_ELEMENT_ID,
GenericPartitionFieldSummary.class)
.project(ManifestFile.schema())
.build()) {

for (ManifestFile manifest : manifests) {
if (deltaSnapshotIds == null) {
manifestPaths.add(manifest.path());
} else if (manifest.snapshotId() != null
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Legacy or old Iceberg manifests can have snapshotId() == null ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For any manifest list file that was correctly written, snapshotId() will never be null.
The check added here is a defensive check.

&& deltaSnapshotIds.contains(manifest.snapshotId())) {
manifestPaths.add(manifest.path());
}
}

} catch (Exception e) {
throw new RuntimeException(
"Failed to read manifests for snapshot " + snapshotId, e);
} finally {
semaphore.release();
}
return null;
});
taskSubmitted = true;
submittedTasks++;
} finally {
if (!taskSubmitted) {
semaphore.release();
}
}
Future<Void> done;
while ((done = completionService.poll()) != null) {
done.get();
completedTasks++;
}
}

while (completedTasks < submittedTasks) {
completionService.take().get();
completedTasks++;
}

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
executorService.shutdownNow();
throw new RuntimeException("Interrupted while processing manifests", e);

} catch (ExecutionException e) {
executorService.shutdownNow();
throw new RuntimeException(
"Failed to collect manifests to rewrite. "
+ "The end version may contain invalid snapshots. "
+ "Please choose an earlier version.",
e.getCause());
}

return manifestPaths;
}

private Set<Snapshot> deltaSnapshots(TableMetadata startMetadata, Set<Snapshot> allSnapshots) {
if (startMetadata == null) {
return allSnapshots;
} else {
Set<Long> startSnapshotIds =
startMetadata.snapshots().stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
return allSnapshots.stream()
.filter(s -> !startSnapshotIds.contains(s.snapshotId()))
.collect(Collectors.toSet());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.RewriteTablePathUtil;
import org.apache.iceberg.StaticTableOperations;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.RuntimeIOException;
Expand Down Expand Up @@ -113,4 +115,9 @@ static void writeAsCsv(Set<Pair<String, String>> rows, OutputFile outputFile) {
throw new RuntimeIOException(e);
}
}

static Table newStaticTable(String metadataFileLocation, FileIO io) {
StaticTableOperations ops = new StaticTableOperations(metadataFileLocation, io);
return new BaseTable(ops, metadataFileLocation);
}
}