diff --git a/CHANGELOG.md b/CHANGELOG.md index a21e092dd850d..22de4342ce365 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix `cluster.remote..server_name` setting no populating SNI ([#20321](https://github.com/opensearch-project/OpenSearch/pull/20321)) - Fix X-Opaque-Id header propagation (along with other response headers) for streaming Reactor Netty 4 transport ([#20371](https://github.com/opensearch-project/OpenSearch/pull/20371)) - Fix indexing regression and bug fixes for grouping criteria. ([20145](https://github.com/opensearch-project/OpenSearch/pull/20145)) +- Fix CriteriaBasedCodec to work with delegate codec. ([20442](https://github.com/opensearch-project/OpenSearch/pull/20442)) ### Dependencies - Bump `com.google.auth:google-auth-library-oauth2-http` from 1.38.0 to 1.41.0 ([#20183](https://github.com/opensearch-project/OpenSearch/pull/20183)) diff --git a/server/src/main/java/org/opensearch/index/codec/CriteriaBasedCodec.java b/server/src/main/java/org/opensearch/index/codec/CriteriaBasedCodec.java index 3c9911d03f987..b58b5fee7049d 100644 --- a/server/src/main/java/org/opensearch/index/codec/CriteriaBasedCodec.java +++ b/server/src/main/java/org/opensearch/index/codec/CriteriaBasedCodec.java @@ -9,15 +9,9 @@ package org.opensearch.index.codec; import org.apache.lucene.codecs.Codec; -import org.apache.lucene.codecs.DocValuesFormat; import org.apache.lucene.codecs.FilterCodec; -import org.apache.lucene.codecs.SegmentInfoFormat; -import org.apache.lucene.codecs.lucene103.Lucene103Codec; -import org.apache.lucene.index.SegmentInfo; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.IOContext; - -import java.io.IOException; +import org.apache.lucene.codecs.PostingsFormat; +import org.apache.lucene.codecs.perfield.PerFieldPostingsFormat; /** * Filter codec used to attach bucket attributes to segments of child writer. @@ -27,46 +21,29 @@ public class CriteriaBasedCodec extends FilterCodec { private final String bucket; public static final String BUCKET_NAME = "bucket"; - private static final String PLACEHOLDER_BUCKET_FOR_PARENT_WRITER = "-2"; - - public CriteriaBasedCodec() { - super("CriteriaBasedCodec", new Lucene103Codec()); - bucket = null; - } + public static final String ATTRIBUTE_BINDING_TARGET_FIELD = "_id"; public CriteriaBasedCodec(Codec delegate, String bucket) { - super("CriteriaBasedCodec", delegate); + super(delegate.getName(), delegate); this.bucket = bucket; } @Override - public SegmentInfoFormat segmentInfoFormat() { - return new SegmentInfoFormat() { - @Override - public SegmentInfo read(Directory directory, String segmentName, byte[] segmentID, IOContext context) throws IOException { - return delegate.segmentInfoFormat().read(directory, segmentName, segmentID, context); - } - - @Override - public void write(Directory directory, SegmentInfo info, IOContext ioContext) throws IOException { - if (bucket != null) { - // We will set BUCKET_NAME attribute only for child writer where bucket will set. - info.putAttribute(BUCKET_NAME, bucket); - } else if (info.getAttribute(BUCKET_NAME) == null) { - // For segment belonging to parent writer, attributes will be set. In case write went to parent - // writer (like for no ops writes or for temporary tombstone entry which is added for deletes/updates - // to sync version across child and parent writers), segments corresponding to those writer does not - // have - info.putAttribute(BUCKET_NAME, PLACEHOLDER_BUCKET_FOR_PARENT_WRITER); + public PostingsFormat postingsFormat() { + PostingsFormat format = super.postingsFormat(); + if (format instanceof PerFieldPostingsFormat) { + return new PerFieldPostingsFormat() { + + @Override + public PostingsFormat getPostingsFormatForField(String field) { + if (field.equals(ATTRIBUTE_BINDING_TARGET_FIELD)) { + return new CriteriaBasedPostingsFormat(((PerFieldPostingsFormat) format).getPostingsFormatForField(field), bucket); + } else { + return ((PerFieldPostingsFormat) format).getPostingsFormatForField(field); + } } - - delegate.segmentInfoFormat().write(directory, info, ioContext); - } - }; - } - - @Override - public DocValuesFormat docValuesFormat() { - return new CriteriaBasedDocValueFormat(bucket); + }; + } + return format; } } diff --git a/server/src/main/java/org/opensearch/index/codec/CriteriaBasedPostingsFormat.java b/server/src/main/java/org/opensearch/index/codec/CriteriaBasedPostingsFormat.java new file mode 100644 index 0000000000000..91c2da71c48f0 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/codec/CriteriaBasedPostingsFormat.java @@ -0,0 +1,145 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec; + +import org.apache.lucene.codecs.FieldsConsumer; +import org.apache.lucene.codecs.FieldsProducer; +import org.apache.lucene.codecs.NormsProducer; +import org.apache.lucene.codecs.PostingsFormat; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.Fields; +import org.apache.lucene.index.MergeState; +import org.apache.lucene.index.SegmentReadState; +import org.apache.lucene.index.SegmentWriteState; + +import java.io.IOException; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import static org.opensearch.index.codec.CriteriaBasedCodec.ATTRIBUTE_BINDING_TARGET_FIELD; + +/** + * Postings format to attach segment info attribute corresponding to grouping criteria associated with segments. + * + */ +public class CriteriaBasedPostingsFormat extends PostingsFormat { + + public static final String CRITERIA_BASED_CODEC_NAME = "CriteriaBasedCodec99"; + private final PostingsFormat delegatePostingsFormat; + private final String bucket; + /** Extension of CAS index to store delegate information. */ + public static final String CAS_FILE_EXTENSION = "cas"; + public static final int VERSION_START = 0; + public static final int VERSION_CURRENT = VERSION_START; + public static final String BUCKET_NAME = "bucket"; + private static final String PLACEHOLDER_BUCKET_FOR_PARENT_WRITER = "-2"; + private static final String DELEGATE_CODEC_KEY = "delegate_codec_key"; + + /** + * Creates a new postings format. + * + *

The provided name will be written into the index segment in some configurations (such as + * when using ): in such configurations, for the segment to be read + * this class should be registered with Java's SPI mechanism (registered in META-INF/ of your jar + * file, etc). + * + */ + protected CriteriaBasedPostingsFormat(PostingsFormat delegatePostingsFormat, String bucket) { + super(CRITERIA_BASED_CODEC_NAME); + this.delegatePostingsFormat = delegatePostingsFormat; + this.bucket = bucket; + } + + // Needed for SPI + public CriteriaBasedPostingsFormat() { + this(null, null); + } + + @Override + public FieldsConsumer fieldsConsumer(SegmentWriteState state) throws IOException { + if (delegatePostingsFormat == null) { + throw new UnsupportedOperationException( + "Error - " + getClass().getName() + " has been constructed without a choice of PostingsFormat" + ); + } + + FieldsConsumer fieldsConsumer = delegatePostingsFormat.fieldsConsumer(state); + return new CriteriaBasedFieldsConsumer(fieldsConsumer, state); + } + + @Override + public FieldsProducer fieldsProducer(SegmentReadState state) throws IOException { + assert state.segmentInfo.getAttribute(DELEGATE_CODEC_KEY) != null; + return PostingsFormat.forName(state.segmentInfo.getAttribute(DELEGATE_CODEC_KEY)).fieldsProducer(state); + } + + class CriteriaBasedFieldsConsumer extends FieldsConsumer { + private final FieldsConsumer delegateFieldsConsumer; + private SegmentWriteState state; + private boolean closed; + + public CriteriaBasedFieldsConsumer(FieldsConsumer delegateFieldsConsumer, SegmentWriteState state) { + this.delegateFieldsConsumer = delegateFieldsConsumer; + this.state = state; + } + + @Override + public void write(Fields fields, NormsProducer norms) throws IOException { + delegateFieldsConsumer.write(fields, norms); + FieldInfo idFieldInfo = state.fieldInfos.fieldInfo(ATTRIBUTE_BINDING_TARGET_FIELD); + if (bucket != null) { + state.segmentInfo.putAttribute(BUCKET_NAME, bucket); + if (idFieldInfo != null) { + idFieldInfo.putAttribute(BUCKET_NAME, bucket); + } + } else if (state.segmentInfo.getAttribute(BUCKET_NAME) == null) { + // For segment belonging to parent writer, attributes will be set. In case write went to parent + // writer (like for no ops writes or for temporary tombstone entry which is added for deletes/updates + // to sync version across child and parent writers), segments corresponding to those writer does not + // have + state.segmentInfo.putAttribute(BUCKET_NAME, PLACEHOLDER_BUCKET_FOR_PARENT_WRITER); + if (idFieldInfo != null) { + idFieldInfo.putAttribute(BUCKET_NAME, PLACEHOLDER_BUCKET_FOR_PARENT_WRITER); + } + } else if (idFieldInfo != null) { + idFieldInfo.putAttribute(BUCKET_NAME, state.segmentInfo.getAttribute(BUCKET_NAME)); + } + + state.segmentInfo.putAttribute(DELEGATE_CODEC_KEY, delegatePostingsFormat.getName()); + } + + @Override + public void merge(MergeState mergeState, NormsProducer norms) throws IOException { + delegateFieldsConsumer.merge(mergeState, norms); + Set mergeFieldNames = StreamSupport.stream(mergeState.mergeFieldInfos.spliterator(), false) + .map(FieldInfo::getName) + .collect(Collectors.toSet()); + if (mergeFieldNames.contains(ATTRIBUTE_BINDING_TARGET_FIELD) && mergeState.fieldInfos.length > 0) { + String attribute = mergeState.fieldInfos[0].fieldInfo(ATTRIBUTE_BINDING_TARGET_FIELD).getAttribute(BUCKET_NAME); + assert attribute != null : "Attribute should not be null during merging segment"; + mergeState.segmentInfo.putAttribute(BUCKET_NAME, attribute); + + mergeState.mergeFieldInfos.fieldInfo(ATTRIBUTE_BINDING_TARGET_FIELD).putAttribute(BUCKET_NAME, attribute); + } + + mergeState.segmentInfo.putAttribute(DELEGATE_CODEC_KEY, delegatePostingsFormat.getName()); + } + + @Override + public void close() throws IOException { + if (closed) { + return; + } + + closed = true; + delegateFieldsConsumer.close(); + } + } +} diff --git a/server/src/main/resources/META-INF/services/org.apache.lucene.codecs.Codec b/server/src/main/resources/META-INF/services/org.apache.lucene.codecs.Codec index b148e5415e168..90f4d20ad07a5 100644 --- a/server/src/main/resources/META-INF/services/org.apache.lucene.codecs.Codec +++ b/server/src/main/resources/META-INF/services/org.apache.lucene.codecs.Codec @@ -1,4 +1,3 @@ org.opensearch.index.codec.composite.composite912.Composite912Codec org.opensearch.index.codec.composite.composite103.Composite103Codec org.opensearch.index.codec.composite.backward_codecs.composite101.Composite101Codec -org.opensearch.index.codec.CriteriaBasedCodec diff --git a/server/src/main/resources/META-INF/services/org.apache.lucene.codecs.PostingsFormat b/server/src/main/resources/META-INF/services/org.apache.lucene.codecs.PostingsFormat index 80b1d25064885..14312177bd9b9 100644 --- a/server/src/main/resources/META-INF/services/org.apache.lucene.codecs.PostingsFormat +++ b/server/src/main/resources/META-INF/services/org.apache.lucene.codecs.PostingsFormat @@ -1,2 +1,3 @@ org.apache.lucene.search.suggest.document.Completion50PostingsFormat org.opensearch.index.codec.fuzzy.FuzzyFilterPostingsFormat +org.opensearch.index.codec.CriteriaBasedPostingsFormat diff --git a/server/src/test/java/org/opensearch/index/codec/CriteriaBasedPostingsFormatTests.java b/server/src/test/java/org/opensearch/index/codec/CriteriaBasedPostingsFormatTests.java new file mode 100644 index 0000000000000..9eefead247e34 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/codec/CriteriaBasedPostingsFormatTests.java @@ -0,0 +1,134 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec; + +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.lucene.codecs.Codec; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.StringField; +import org.apache.lucene.document.TextField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.MultiTerms; +import org.apache.lucene.index.SegmentCommitInfo; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.index.Terms; +import org.apache.lucene.index.TermsEnum; +import org.apache.lucene.store.ByteBuffersDirectory; +import org.apache.lucene.store.Directory; +import org.apache.lucene.tests.index.BasePostingsFormatTestCase; +import org.apache.lucene.tests.util.TestUtil; +import org.apache.lucene.util.BytesRef; + +import java.io.IOException; + +public class CriteriaBasedPostingsFormatTests extends BasePostingsFormatTestCase { + + private static final String TEST_BUCKET = "test_bucket"; + private Codec criteriaBasedPostingFormat = TestUtil.alwaysPostingsFormat( + new CriteriaBasedPostingsFormat(TestUtil.getDefaultPostingsFormat(), TEST_BUCKET) + ); + + public void testBasicFunctionality() throws IOException { + Directory dir = new ByteBuffersDirectory(); + IndexWriterConfig iwc = new IndexWriterConfig(new StandardAnalyzer()); + iwc.setCodec(criteriaBasedPostingFormat); + + try (IndexWriter writer = new IndexWriter(dir, iwc)) { + for (int i = 0; i < 100; i++) { + Document doc = new Document(); + doc.add(new StringField("_id", "doc" + i, Field.Store.YES)); + doc.add(new TextField("field", "value" + i, Field.Store.YES)); + writer.addDocument(doc); + } + } + + try (IndexReader reader = DirectoryReader.open(dir)) { + assertEquals(100, reader.numDocs()); + + Terms terms = MultiTerms.getTerms(reader, "field"); + assertNotNull(terms); + TermsEnum termsEnum = terms.iterator(); + + int count = 0; + BytesRef term; + while ((term = termsEnum.next()) != null) { + assertTrue(term.utf8ToString().startsWith("value")); + count++; + } + assertEquals(100, count); + } + } + + public void testBucketAttributeIsSetOnSegment() throws IOException { + Directory dir = new ByteBuffersDirectory(); + IndexWriterConfig iwc = new IndexWriterConfig(new StandardAnalyzer()); + iwc.setCodec(criteriaBasedPostingFormat); + + try (IndexWriter writer = new IndexWriter(dir, iwc)) { + Document doc = new Document(); + doc.add(new StringField("_id", "doc1", Field.Store.YES)); + doc.add(new TextField("content", "test content", Field.Store.YES)); + writer.addDocument(doc); + } + + SegmentInfos segmentInfos = SegmentInfos.readLatestCommit(dir); + assertFalse("Should have at least one segment", segmentInfos.asList().isEmpty()); + + for (SegmentCommitInfo segmentCommitInfo : segmentInfos) { + String bucketValue = segmentCommitInfo.info.getAttribute(CriteriaBasedPostingsFormat.BUCKET_NAME); + assertEquals("Bucket attribute should be set", TEST_BUCKET, bucketValue); + } + } + + public void testNullBucketSetsPlaceholder() throws IOException { + Directory dir = new ByteBuffersDirectory(); + Codec nullBucketCodec = TestUtil.alwaysPostingsFormat(new CriteriaBasedPostingsFormat(TestUtil.getDefaultPostingsFormat(), null)); + + IndexWriterConfig iwc = new IndexWriterConfig(new StandardAnalyzer()); + iwc.setCodec(nullBucketCodec); + + try (IndexWriter writer = new IndexWriter(dir, iwc)) { + Document doc = new Document(); + doc.add(new StringField("_id", "doc1", Field.Store.YES)); + doc.add(new TextField("content", "test content", Field.Store.YES)); + writer.addDocument(doc); + } + + SegmentInfos segmentInfos = SegmentInfos.readLatestCommit(dir); + for (SegmentCommitInfo segmentCommitInfo : segmentInfos) { + String bucketValue = segmentCommitInfo.info.getAttribute(CriteriaBasedPostingsFormat.BUCKET_NAME); + assertEquals("Placeholder bucket should be set for null bucket", "-2", bucketValue); + } + } + + public void testEmptyIndex() throws IOException { + Directory dir = new ByteBuffersDirectory(); + IndexWriterConfig iwc = new IndexWriterConfig(new StandardAnalyzer()); + iwc.setCodec(criteriaBasedPostingFormat); + + try (IndexWriter writer = new IndexWriter(dir, iwc)) { + // Don't add any documents + } + + try (IndexReader reader = DirectoryReader.open(dir)) { + assertEquals(0, reader.numDocs()); + Terms terms = MultiTerms.getTerms(reader, "_id"); + assertNull("Terms should be null for empty index", terms); + } + } + + @Override + protected Codec getCodec() { + return criteriaBasedPostingFormat; + } +} diff --git a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java index 7e1a92a925df9..7406e48512869 100644 --- a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java @@ -127,6 +127,7 @@ import org.opensearch.core.indices.breaker.NoneCircuitBreakerService; import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.index.CriteriaBasedMergePolicy; import org.opensearch.index.IndexSettings; import org.opensearch.index.VersionType; import org.opensearch.index.codec.CodecService; @@ -210,6 +211,7 @@ import static java.util.Collections.shuffle; import static org.opensearch.common.util.FeatureFlags.CONTEXT_AWARE_MIGRATION_EXPERIMENTAL_FLAG; +import static org.opensearch.index.codec.CriteriaBasedCodec.BUCKET_NAME; import static org.opensearch.index.engine.Engine.Operation.Origin.LOCAL_RESET; import static org.opensearch.index.engine.Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY; import static org.opensearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY; @@ -3811,6 +3813,49 @@ public void onFailedEngine(String reason, Exception e) { } } + @LockFeatureFlag(CONTEXT_AWARE_MIGRATION_EXPERIMENTAL_FLAG) + public void testCriteriaBasedGrouping() throws Exception { + final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings( + "test", + Settings.builder() + .put(defaultSettings.getSettings()) + .put(IndexSettings.INDEX_CONTEXT_AWARE_ENABLED_SETTING.getKey(), true) + .build() + ); + try ( + Store store = createStore(); + Engine engine = createEngine(indexSettings, store, createTempDir(), newCriteriaBasedMergePolicy(), null, null, null, null, null) + ) { + List segments = engine.segments(true); + assertThat(segments.isEmpty(), equalTo(true)); + for (int i = 1; i <= 100; i++) { + String groupingCriteria; + if (i % 3 == 0) { + groupingCriteria = "grouping_criteria1"; + } else if (i % 3 == 1) { + groupingCriteria = "grouping_criteria2"; + } else { + groupingCriteria = "grouping_criteria3"; + } + + final ParsedDocument doc = testParsedDocument("1", null, testContextSpecificDocument(groupingCriteria), B_1, null); + engine.index(indexForDoc(doc)); + if (i % 5 == 0) { + engine.refresh("test"); + } + } + + engine.refresh("test"); + segments = engine.segments(true); + Set attributes = segments.stream().map(segment -> segment.getAttributes().get(BUCKET_NAME)).collect(Collectors.toSet()); + assertThat(attributes, Matchers.containsInAnyOrder("grouping_criteria1", "grouping_criteria2", "grouping_criteria3")); + } + } + + private CriteriaBasedMergePolicy newCriteriaBasedMergePolicy() { + return new CriteriaBasedMergePolicy(newMergePolicy()); + } + public void testSettings() { CodecService codecService = new CodecService(null, engine.config().getIndexSettings(), logger); LiveIndexWriterConfig currentIndexWriterConfig = engine.getCurrentIndexWriterConfig(); @@ -8693,9 +8738,9 @@ public void testShardFailsForCompositeIndexWriterInCaseAddIndexesThrewExceptionW MockDirectoryWrapper wrapper = newMockDirectory(); final Path translogPath = createTempDir("testFailEngineOnRandomIO"); try (Store store = createStore(wrapper)) { - final ParsedDocument doc1 = testParsedDocument("1", null, testContextSpecificDocument(), B_1, null); - final ParsedDocument doc2 = testParsedDocument("2", null, testContextSpecificDocument(), B_1, null); - final ParsedDocument doc3 = testParsedDocument("3", null, testContextSpecificDocument(), B_1, null); + final ParsedDocument doc1 = testParsedDocument("1", null, testContextSpecificDocument("grouping_criteria"), B_1, null); + final ParsedDocument doc2 = testParsedDocument("2", null, testContextSpecificDocument("grouping_criteria"), B_1, null); + final ParsedDocument doc3 = testParsedDocument("3", null, testContextSpecificDocument("grouping_criteria"), B_1, null); AtomicReference throwingIndexWriter = new AtomicReference<>(); final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings( @@ -8738,9 +8783,9 @@ public void testShardFailsForCompositeIndexWriterInCaseAddIndexesThrewExceptionW MockDirectoryWrapper wrapper = newMockDirectory(); final Path translogPath = createTempDir("testFailEngineOnRandomIO"); try (Store store = createStore(wrapper)) { - final ParsedDocument doc1 = testParsedDocument("1", null, testContextSpecificDocument(), B_1, null); - final ParsedDocument doc2 = testParsedDocument("2", null, testContextSpecificDocument(), B_1, null); - final ParsedDocument doc3 = testParsedDocument("1", null, testContextSpecificDocument(), B_1, null); + final ParsedDocument doc1 = testParsedDocument("1", null, testContextSpecificDocument("grouping_criteria"), B_1, null); + final ParsedDocument doc2 = testParsedDocument("2", null, testContextSpecificDocument("grouping_criteria"), B_1, null); + final ParsedDocument doc3 = testParsedDocument("1", null, testContextSpecificDocument("grouping_criteria"), B_1, null); final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings( "test", Settings.builder() diff --git a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java index e4e56cddd21d2..34e74207a75a1 100644 --- a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java @@ -366,9 +366,9 @@ protected void assertEngineCleanedUp(Engine engine, TranslogDeletionPolicy trans } } - protected static ParseContext.Document testContextSpecificDocument() { + protected static ParseContext.Document testContextSpecificDocument(String groupingCriteria) { ParseContext.Document doc = testDocumentWithTextField("criteria"); - doc.setGroupingCriteria("grouping_criteria"); + doc.setGroupingCriteria(groupingCriteria); return doc; }