Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,12 @@
<td>Boolean</td>
<td>Whether to enable global index for scan.</td>
</tr>
<tr>
<td><h5>global-index.external-path</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Global index root directory, if not set, the global index files will be stored under the &lt;table-root-directory&gt;/index.</td>
</tr>
<tr>
<td><h5>global-index.row-count-per-shard</h5></td>
<td style="word-wrap: break-word;">100000</td>
Expand Down
20 changes: 20 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -2013,6 +2013,13 @@ public InlineElement getDescription() {
.defaultValue(false)
.withDescription("Whether index file in data file directory.");

public static final ConfigOption<String> GLOBAL_INDEX_EXTERNAL_PATH =
key("global-index.external-path")
.stringType()
.noDefaultValue()
.withDescription(
"Global index root directory, if not set, the global index files will be stored under the <table-root-directory>/index.");

public static final ConfigOption<MemorySize> LOOKUP_MERGE_BUFFER_SIZE =
key("lookup.merge-buffer-size")
.memoryType()
Expand Down Expand Up @@ -2703,6 +2710,19 @@ public boolean indexFileInDataFileDir() {
return options.get(INDEX_FILE_IN_DATA_FILE_DIR);
}

public Path globalIndexExternalPath() {
String pathString = options.get(GLOBAL_INDEX_EXTERNAL_PATH);
if (pathString == null || pathString.isEmpty()) {
return null;
}
Path path = new Path(pathString);
String scheme = path.toUri().getScheme();
if (scheme == null) {
throw new IllegalArgumentException("scheme should not be null: " + path);
}
return path;
}

public LookupStrategy lookupStrategy() {
return LookupStrategy.from(
mergeEngine().equals(MergeEngine.FIRST_ROW),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,26 @@

package org.apache.paimon.globalindex;

import org.apache.paimon.fs.Path;

import java.util.Arrays;
import java.util.Objects;

/** Index meta for global index. */
public class GlobalIndexIOMeta {

private final String fileName;
private final Path filePath;
private final long fileSize;
private final byte[] metadata;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The GlobalIndexIOMeta is intended to be a meta interface for plugin authors. However, including an external path in it might be confusing for users, as it adds complexity and is not intuitive in this context.

Would it make more sense to store only the file_path in GlobalIndexIOMeta, while leaving the external path to be handled internally by the Paimon framework? This way, the external path could be extracted and managed within the IndexFileMeta, simplifying the API for plugin authors and ensuring a clearer separation of concerns.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accepted

public GlobalIndexIOMeta(String fileName, long fileSize, byte[] metadata) {
this.fileName = fileName;
public GlobalIndexIOMeta(Path filePath, long fileSize, byte[] metadata) {
this.filePath = filePath;
this.fileSize = fileSize;
this.metadata = metadata;
}

public String fileName() {
return fileName;
public Path filePath() {
return filePath;
}

public long fileSize() {
Expand All @@ -55,14 +57,14 @@ public boolean equals(Object o) {
return false;
}
GlobalIndexIOMeta that = (GlobalIndexIOMeta) o;
return Objects.equals(fileName, that.fileName)
return Objects.equals(filePath, that.filePath)
&& fileSize == that.fileSize
&& Arrays.equals(metadata, that.metadata);
}

@Override
public int hashCode() {
int result = Objects.hash(fileName, fileSize);
int result = Objects.hash(filePath, fileSize);
result = 31 * result + Arrays.hashCode(metadata);
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public GlobalIndexReader createReader(
GlobalIndexFileReader fileReader, List<GlobalIndexIOMeta> files) throws IOException {
checkArgument(files.size() == 1);
GlobalIndexIOMeta indexMeta = files.get(0);
SeekableInputStream input = fileReader.getInputStream(indexMeta.fileName());
SeekableInputStream input = fileReader.getInputStream(indexMeta);
FileIndexReader reader = index.createReader(input, 0, (int) indexMeta.fileSize());
return new FileIndexReaderWrapper(reader, this::toGlobalResult, input);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,11 @@ public BTreeIndexReader(
this.minKey = null;
this.maxKey = null;
}
this.input = fileReader.getInputStream(globalIndexIOMeta.fileName());
this.input = fileReader.getInputStream(globalIndexIOMeta);

// prepare file footer
long fileSize = globalIndexIOMeta.fileSize();
Path filePath = fileReader.filePath(globalIndexIOMeta.fileName());
Path filePath = globalIndexIOMeta.filePath();
BlockCache blockCache = new BlockCache(filePath, input, cacheManager);
BTreeFileFooter footer = readFooter(blockCache, fileSize);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.globalindex.btree;

import org.apache.paimon.fs.Path;
import org.apache.paimon.globalindex.GlobalIndexIOMeta;
import org.apache.paimon.globalindex.GlobalIndexReader;
import org.apache.paimon.globalindex.GlobalIndexResult;
Expand All @@ -42,7 +43,7 @@ public class LazyFilteredBTreeReader implements GlobalIndexReader {

private final BTreeFileMetaSelector fileSelector;
private final List<GlobalIndexIOMeta> files;
private final Map<String, GlobalIndexReader> readerCache;
private final Map<Path, GlobalIndexReader> readerCache;
private final KeySerializer keySerializer;
private final CacheManager cacheManager;
private final GlobalIndexFileReader fileReader;
Expand Down Expand Up @@ -259,7 +260,7 @@ private UnionGlobalIndexReader createUnionReader(List<GlobalIndexIOMeta> files)
for (GlobalIndexIOMeta meta : files) {
readers.add(
readerCache.computeIfAbsent(
meta.fileName(),
meta.filePath(),
name -> {
try {
return new BTreeIndexReader(
Expand All @@ -276,7 +277,7 @@ private UnionGlobalIndexReader createUnionReader(List<GlobalIndexIOMeta> files)
@Override
public void close() throws IOException {
IOException exception = null;
for (Map.Entry<String, GlobalIndexReader> entry : this.readerCache.entrySet()) {
for (Map.Entry<Path, GlobalIndexReader> entry : this.readerCache.entrySet()) {
try {
entry.getValue().close();
} catch (IOException ioe) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@

package org.apache.paimon.globalindex.io;

import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.globalindex.GlobalIndexIOMeta;

import java.io.IOException;

/** File reader for global index. */
public interface GlobalIndexFileReader {

SeekableInputStream getInputStream(String fileName) throws IOException;

Path filePath(String fileName);
SeekableInputStream getInputStream(GlobalIndexIOMeta meta) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.globalindex.GlobalIndexIOMeta;
import org.apache.paimon.globalindex.GlobalIndexReader;
Expand Down Expand Up @@ -243,19 +242,9 @@ public PositionOutputStream newOutputStream(String fileName)
long fileSize = fileIO.getFileSize(path);

GlobalIndexFileReader fileReader =
new GlobalIndexFileReader() {
@Override
public SeekableInputStream getInputStream(String fileName) throws IOException {
return fileIO.newInputStream(new Path(tempDir.toString(), fileName));
}

@Override
public Path filePath(String fileName) {
return new Path(tempDir.toString(), fileName);
}
};
meta -> fileIO.newInputStream(new Path(tempDir.toString(), meta.filePath()));

GlobalIndexIOMeta globalIndexMeta = new GlobalIndexIOMeta(fileName, fileSize, null);
GlobalIndexIOMeta globalIndexMeta = new GlobalIndexIOMeta(path, fileSize, null);

return bitmapGlobalIndex.createReader(
fileReader, Collections.singletonList(globalIndexMeta));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.globalindex.GlobalIndexIOMeta;
import org.apache.paimon.globalindex.GlobalIndexParallelWriter;
Expand Down Expand Up @@ -109,18 +108,9 @@ public PositionOutputStream newOutputStream(String fileName)
}
};
fileReader =
new GlobalIndexFileReader() {
@Override
public SeekableInputStream getInputStream(String fileName) throws IOException {
return fileIO.newInputStream(
new Path(new Path(tempPath.toUri()), fileName));
}

@Override
public Path filePath(String fileName) {
return new Path(new Path(tempPath.toUri()), fileName);
}
};
meta ->
fileIO.newInputStream(
new Path(new Path(tempPath.toUri()), meta.filePath()));
options = new Options();
options.set(BTreeIndexOptions.BTREE_INDEX_CACHE_SIZE, MemorySize.ofMebiBytes(8));
globalIndexer = new BTreeGlobalIndexer(new DataField(1, "testField", dataType), options);
Expand All @@ -147,7 +137,7 @@ protected GlobalIndexIOMeta writeData(List<Pair<Object, Long>> data) throws IOEx
ResultEntry resultEntry = results.get(0);
String fileName = resultEntry.fileName();
return new GlobalIndexIOMeta(
fileName,
new Path(new Path(tempPath.toUri()), fileName),
fileIO.getFileSize(new Path(new Path(tempPath.toUri()), fileName)),
resultEntry.meta());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.globalindex.btree;

import org.apache.paimon.fs.Path;
import org.apache.paimon.globalindex.GlobalIndexIOMeta;
import org.apache.paimon.memory.MemorySliceOutput;
import org.apache.paimon.predicate.FieldRef;
Expand Down Expand Up @@ -54,11 +55,11 @@ public void setUp() {

files =
Arrays.asList(
new GlobalIndexIOMeta("file1", 1, meta1.serialize()),
new GlobalIndexIOMeta("file2", 1, meta2.serialize()),
new GlobalIndexIOMeta("file3", 1, meta3.serialize()),
new GlobalIndexIOMeta("file4", 1, meta4.serialize()),
new GlobalIndexIOMeta("file5", 1, meta5.serialize()));
new GlobalIndexIOMeta(new Path("file1"), 1, meta1.serialize()),
new GlobalIndexIOMeta(new Path("file2"), 1, meta2.serialize()),
new GlobalIndexIOMeta(new Path("file3"), 1, meta3.serialize()),
new GlobalIndexIOMeta(new Path("file4"), 1, meta4.serialize()),
new GlobalIndexIOMeta(new Path("file5"), 1, meta5.serialize()));
}

@Test
Expand Down Expand Up @@ -147,7 +148,8 @@ public void testMetaSelector() {
private void assertFiles(List<GlobalIndexIOMeta> files, List<String> expected) {
Assertions.assertThat(
files.stream()
.map(GlobalIndexIOMeta::fileName)
.map(GlobalIndexIOMeta::filePath)
.map(Path::toString)
.collect(Collectors.toList()))
.containsExactlyInAnyOrderElementsOf(expected);
}
Expand Down
4 changes: 4 additions & 0 deletions paimon-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ under the License.
<version>${hadoop.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ protected FileStorePathFactory pathFactory(CoreOptions options, String format) {
options.dataFilePathDirectory(),
createExternalPaths(),
options.externalPathStrategy(),
options.indexFileInDataFileDir());
options.indexFileInDataFileDir(),
options.globalIndexExternalPath());
}

private List<Path> createExternalPaths() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.paimon.globalindex;

import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
Expand All @@ -44,21 +43,15 @@ public String newFileName(String prefix) {
return prefix + "-" + "global-index-" + UUID.randomUUID() + ".index";
}

@Override
public Path filePath(String fileName) {
return indexPathFactory.toPath(fileName);
}

public long fileSize(String fileName) throws IOException {
return fileIO.getFileSize(filePath(fileName));
return fileIO.getFileSize(indexPathFactory.toPath(fileName));
}

public PositionOutputStream newOutputStream(String fileName) throws IOException {
return fileIO.newOutputStream(indexPathFactory.toPath(fileName), true);
}

public SeekableInputStream getInputStream(String fileName) throws IOException {
Path path = indexPathFactory.toPath(fileName);
return fileIO.newInputStream(path);
public SeekableInputStream getInputStream(GlobalIndexIOMeta meta) throws IOException {
return fileIO.newInputStream(meta.filePath());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.paimon.globalindex;

import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
import org.apache.paimon.index.GlobalIndexMeta;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.index.IndexPathFactory;
Expand Down Expand Up @@ -54,6 +56,7 @@ public class RowRangeGlobalIndexScanner implements Closeable {

private final Options options;
private final GlobalIndexEvaluator globalIndexEvaluator;
private final IndexPathFactory indexPathFactory;

public RowRangeGlobalIndexScanner(
Options options,
Expand All @@ -76,8 +79,9 @@ public RowRangeGlobalIndexScanner(
+ ")");
}

GlobalIndexFileReadWrite indexFileReadWrite =
new GlobalIndexFileReadWrite(fileIO, indexPathFactory);
this.indexPathFactory = indexPathFactory;

GlobalIndexFileReader indexFileReader = meta -> fileIO.newInputStream(meta.filePath());

Map<Integer, Map<String, Map<Range, List<IndexFileMeta>>>> indexMetas = new HashMap<>();
for (IndexManifestEntry entry : entries) {
Expand All @@ -97,7 +101,7 @@ public RowRangeGlobalIndexScanner(
IntFunction<Collection<GlobalIndexReader>> readersFunction =
fieldId ->
createReaders(
indexFileReadWrite,
indexFileReader,
indexMetas.get(fieldId),
rowType.getField(fieldId));
this.globalIndexEvaluator = new GlobalIndexEvaluator(rowType, readersFunction);
Expand All @@ -109,7 +113,7 @@ public Optional<GlobalIndexResult> scan(
}

private Collection<GlobalIndexReader> createReaders(
GlobalIndexFileReadWrite indexFileReadWrite,
GlobalIndexFileReader indexFileReadWrite,
Map<String, Map<Range, List<IndexFileMeta>>> indexMetas,
DataField dataField) {
if (indexMetas == null) {
Expand Down Expand Up @@ -154,7 +158,8 @@ private Collection<GlobalIndexReader> createReaders(
private GlobalIndexIOMeta toGlobalMeta(IndexFileMeta meta) {
GlobalIndexMeta globalIndex = meta.globalIndexMeta();
checkNotNull(globalIndex);
return new GlobalIndexIOMeta(meta.fileName(), meta.fileSize(), globalIndex.indexMeta());
Path filePath = indexPathFactory.toPath(meta);
return new GlobalIndexIOMeta(filePath, meta.fileSize(), globalIndex.indexMeta());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public IndexFileMeta write(IntIterator input) throws IOException {
fileSize(path),
count,
null,
isExternalPath() ? path.toString() : null);
isExternalPath() ? path.toString() : null,
null);
}

public IndexFileMeta write(int[] ints) throws IOException {
Expand Down
Loading