-
Notifications
You must be signed in to change notification settings - Fork 599
HDDS-14942. Implement manifest selection logic for rewrite based on snapshot delta #10145
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,10 +22,21 @@ | |
| import java.util.Objects; | ||
| import java.util.Set; | ||
| import java.util.UUID; | ||
| import java.util.concurrent.ConcurrentHashMap; | ||
| import java.util.concurrent.ExecutionException; | ||
| import java.util.concurrent.ExecutorCompletionService; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.Executors; | ||
| import java.util.concurrent.Future; | ||
| import java.util.concurrent.Semaphore; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.stream.Collectors; | ||
| import org.apache.iceberg.FileFormat; | ||
| import org.apache.iceberg.GenericManifestFile; | ||
| import org.apache.iceberg.GenericPartitionFieldSummary; | ||
| import org.apache.iceberg.HasTableOperations; | ||
| import org.apache.iceberg.InternalData; | ||
| import org.apache.iceberg.ManifestFile; | ||
| import org.apache.iceberg.PartitionStatisticsFile; | ||
| import org.apache.iceberg.RewriteTablePathUtil; | ||
| import org.apache.iceberg.RewriteTablePathUtil.RewriteResult; | ||
|
|
@@ -37,6 +48,7 @@ | |
| import org.apache.iceberg.TableMetadataParser; | ||
| import org.apache.iceberg.actions.ImmutableRewriteTablePath; | ||
| import org.apache.iceberg.actions.RewriteTablePath; | ||
| import org.apache.iceberg.io.CloseableIterable; | ||
| import org.apache.iceberg.util.Pair; | ||
|
|
||
| /** | ||
|
|
@@ -57,11 +69,15 @@ public class RewriteTablePathOzoneAction implements RewriteTablePath { | |
| private String stagingDir; | ||
| private int parallelism; | ||
|
|
||
| private ExecutorService executorService; | ||
| private static final int MAX_INFLIGHT_MULTIPLIER = 4; | ||
| private static final int DEFAULT_THREAD_COUNT = 10; | ||
|
|
||
| private final Table table; | ||
|
|
||
| public RewriteTablePathOzoneAction(Table table) { | ||
| this.table = table; | ||
| this.parallelism = Runtime.getRuntime().availableProcessors(); | ||
| this.parallelism = DEFAULT_THREAD_COUNT; | ||
| } | ||
|
|
||
| public RewriteTablePathOzoneAction(Table table, int parallelism) { | ||
|
|
@@ -102,8 +118,7 @@ public RewriteTablePath stagingLocation(String stagingLocation) { | |
| @Override | ||
| public Result execute() { | ||
| validateInputs(); | ||
| // TODO: should use for parallel manifest and position delete file rewriting. | ||
| ExecutorService executorService = Executors.newFixedThreadPool(parallelism); | ||
| executorService = Executors.newFixedThreadPool(parallelism); | ||
| try { | ||
| return doExecute(); | ||
| } finally { | ||
|
|
@@ -197,6 +212,9 @@ private boolean versionInFilePath(String path, String version) { | |
|
|
||
| private String rebuildMetadata() { | ||
| //TODO need to implement rewrite of manifest list , manifest files and position delete files. | ||
| TableMetadata startMetadata = startVersionName != null | ||
| ? new StaticTableOperations(startVersionName, table.io()).current() | ||
| : null; | ||
| TableMetadata endMetadata = new StaticTableOperations(endVersionName, table.io()).current(); | ||
|
|
||
| List<PartitionStatisticsFile> partitionStats = endMetadata.partitionStatisticsFiles(); | ||
|
|
@@ -205,6 +223,9 @@ private String rebuildMetadata() { | |
| } | ||
|
|
||
| RewriteResult<Snapshot> rewriteVersionResult = rewriteVersionFiles(endMetadata); | ||
| Set<Snapshot> deltaSnapshots = deltaSnapshots(startMetadata, rewriteVersionResult.toRewrite()); | ||
| //TODO: manifestsToRewrite will be used while re-write of manifest-list files. | ||
| Set<String> manifestsToRewrite = manifestsToRewrite(deltaSnapshots, startMetadata, endMetadata); | ||
|
|
||
| Set<Pair<String, String>> copyPlan = new HashSet<>(); | ||
| copyPlan.addAll(rewriteVersionResult.copyPlan()); | ||
|
|
@@ -251,4 +272,109 @@ private Set<Pair<String, String>> rewriteVersionFile(TableMetadata metadata, Str | |
|
|
||
| return result; | ||
| } | ||
|
|
||
| private Set<String> manifestsToRewrite(Set<Snapshot> deltaSnapshots, TableMetadata startMetadata, | ||
| TableMetadata endMetadata) { | ||
|
|
||
| final Set<Long> deltaSnapshotIds; | ||
| if (startMetadata != null) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The entire
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When startMetadata is not provided, deltaSnapshots will be equal to the full set of snapshots collected across all version files during the version file rewrite phase. When startMetadata is provided, deltaSnapshots will contain only those snapshots that are not tracked by the start version i.e. snapshots that appeared in intermediate version files between start and end, minus the snapshots already present in the start version's metadata. So we don't use deltaSnapshots for iterating and instead iterate through the snapshots collected from the endVersion metadata file because we won't be able to read the manifest list associated with the expired snapshots. |
||
| deltaSnapshotIds = deltaSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet()); | ||
| } else { | ||
| deltaSnapshotIds = null; | ||
| } | ||
|
|
||
| Set<String> manifestPaths = ConcurrentHashMap.newKeySet(); | ||
| int maxInFlight = parallelism * MAX_INFLIGHT_MULTIPLIER; | ||
| Semaphore semaphore = new Semaphore(maxInFlight); | ||
|
|
||
| ExecutorCompletionService<Void> completionService = new ExecutorCompletionService<>(executorService); | ||
|
|
||
| int submittedTasks = 0; | ||
| int completedTasks = 0; | ||
|
|
||
| try { | ||
| for (Snapshot snapshot : endMetadata.snapshots()) { | ||
| semaphore.acquire(); // blocks when maxInFlight tasks are already in-flight | ||
|
|
||
| final long snapshotId = snapshot.snapshotId(); | ||
| final String manifestListLocation = snapshot.manifestListLocation(); | ||
|
|
||
| boolean taskSubmitted = false; | ||
| try { | ||
| completionService.submit(() -> { | ||
| try (CloseableIterable<ManifestFile> manifests = | ||
| InternalData.read( | ||
| FileFormat.AVRO, | ||
| table.io().newInputFile(manifestListLocation)) | ||
| .setRootType(GenericManifestFile.class) | ||
| .setCustomType( | ||
| ManifestFile.PARTITION_SUMMARIES_ELEMENT_ID, | ||
| GenericPartitionFieldSummary.class) | ||
| .project(ManifestFile.schema()) | ||
| .build()) { | ||
|
|
||
| for (ManifestFile manifest : manifests) { | ||
| if (deltaSnapshotIds == null) { | ||
| manifestPaths.add(manifest.path()); | ||
| } else if (manifest.snapshotId() != null | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Legacy or old Iceberg manifests can have snapshotId() == null ?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For any manifest list file that was correctly written, snapshotId() will never be null. |
||
| && deltaSnapshotIds.contains(manifest.snapshotId())) { | ||
| manifestPaths.add(manifest.path()); | ||
| } | ||
| } | ||
|
|
||
| } catch (Exception e) { | ||
| throw new RuntimeException( | ||
| "Failed to read manifests for snapshot " + snapshotId, e); | ||
| } finally { | ||
| semaphore.release(); | ||
| } | ||
| return null; | ||
| }); | ||
| taskSubmitted = true; | ||
| submittedTasks++; | ||
| } finally { | ||
| if (!taskSubmitted) { | ||
| semaphore.release(); | ||
| } | ||
| } | ||
| Future<Void> done; | ||
| while ((done = completionService.poll()) != null) { | ||
| done.get(); | ||
| completedTasks++; | ||
| } | ||
| } | ||
|
|
||
| while (completedTasks < submittedTasks) { | ||
| completionService.take().get(); | ||
| completedTasks++; | ||
| } | ||
|
|
||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| executorService.shutdownNow(); | ||
| throw new RuntimeException("Interrupted while processing manifests", e); | ||
|
|
||
| } catch (ExecutionException e) { | ||
| executorService.shutdownNow(); | ||
| throw new RuntimeException( | ||
| "Failed to collect manifests to rewrite. " | ||
| + "The end version may contain invalid snapshots. " | ||
| + "Please choose an earlier version.", | ||
| e.getCause()); | ||
| } | ||
|
|
||
| return manifestPaths; | ||
| } | ||
|
|
||
| private Set<Snapshot> deltaSnapshots(TableMetadata startMetadata, Set<Snapshot> allSnapshots) { | ||
| if (startMetadata == null) { | ||
| return allSnapshots; | ||
| } else { | ||
| Set<Long> startSnapshotIds = | ||
| startMetadata.snapshots().stream().map(Snapshot::snapshotId).collect(Collectors.toSet()); | ||
| return allSnapshots.stream() | ||
| .filter(s -> !startSnapshotIds.contains(s.snapshotId())) | ||
| .collect(Collectors.toSet()); | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thread count can be passed via command, can be done in subsequent PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes , we can pass the thread count via command during which
public RewriteTablePathOzoneAction(Table table, int parallelism)will be used , we can add this when the CLI command is added for the rewrite.