Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix indexing regression and bug fixes for grouping criteria. ([20145](https://github.com/opensearch-project/OpenSearch/pull/20145))
- LeafReader should not remove SubReaderWrappers incase IndexWriter encounters a non aborting Exception ([#20193](https://github.com/opensearch-project/OpenSearch/pull/20193))
- Fix Netty deprecation warnings in transport-reactor-netty4 module ([20429](https://github.com/opensearch-project/OpenSearch/pull/20429))
- Remove ClusterState reference in ConcreteIndices ([20454](https://github.com/opensearch-project/OpenSearch/pull/20454))
- Fix stats aggregation returning zero results with `size:0`. ([20427](https://github.com/opensearch-project/OpenSearch/pull/20427))
- Remove child level directory on refresh for CompositeIndexWriter ([#20326](https://github.com/opensearch-project/OpenSearch/pull/20326))
- Fixes and refactoring in stream transport to make it more robust ([#20359](https://github.com/opensearch-project/OpenSearch/pull/20359))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ protected void doRun() {
if (handleBlockExceptions(clusterState)) {
return;
}
final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver);
final ConcreteIndices concreteIndices = new ConcreteIndices(indexNameExpressionResolver);
Metadata metadata = clusterState.metadata();
// go over all the requests and create a ShardId -> Operations mapping
Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
Expand All @@ -610,14 +610,14 @@ protected void doRun() {
if (addFailureIfRequiresAliasAndAliasIsMissing(docWriteRequest, i, metadata)) {
continue;
}
if (addFailureIfIndexIsUnavailable(docWriteRequest, i, concreteIndices, metadata)) {
if (addFailureIfIndexIsUnavailable(clusterState, docWriteRequest, i, concreteIndices, metadata)) {
continue;
}
if (addFailureIfAppendOnlyIndexAndOpsDeleteOrUpdate(docWriteRequest, i, concreteIndices, metadata)) {
if (addFailureIfAppendOnlyIndexAndOpsDeleteOrUpdate(clusterState, docWriteRequest, i, concreteIndices, metadata)) {
continue;
}

Index concreteIndex = concreteIndices.resolveIfAbsent(docWriteRequest);
Index concreteIndex = concreteIndices.resolveIfAbsent(clusterState, docWriteRequest);
try {
// The ConcreteIndices#resolveIfAbsent(...) method validates via IndexNameExpressionResolver whether
// an operation is allowed in index into a data stream, but this isn't done when resolve call is cached, so
Expand Down Expand Up @@ -821,12 +821,13 @@ public void onTimeout(TimeValue timeout) {
}

private boolean addFailureIfAppendOnlyIndexAndOpsDeleteOrUpdate(
ClusterState clusterstate,
DocWriteRequest<?> request,
int idx,
final ConcreteIndices concreteIndices,
Metadata metadata
) {
Index concreteIndex = concreteIndices.resolveIfAbsent(request);
Index concreteIndex = concreteIndices.resolveIfAbsent(clusterstate, request);
final IndexMetadata indexMetadata = metadata.index(concreteIndex);
if (indexMetadata.isAppendOnlyIndex()) {
if ((request.opType() == DocWriteRequest.OpType.UPDATE || request.opType() == DocWriteRequest.OpType.DELETE)) {
Expand Down Expand Up @@ -874,6 +875,7 @@ private boolean addFailureIfRequiresAliasAndAliasIsMissing(DocWriteRequest<?> re
}

private boolean addFailureIfIndexIsUnavailable(
ClusterState clusterstate,
DocWriteRequest<?> request,
int idx,
final ConcreteIndices concreteIndices,
Expand All @@ -887,7 +889,7 @@ private boolean addFailureIfIndexIsUnavailable(
Index concreteIndex = concreteIndices.getConcreteIndex(request.index());
if (concreteIndex == null) {
try {
concreteIndex = concreteIndices.resolveIfAbsent(request);
concreteIndex = concreteIndices.resolveIfAbsent(clusterstate, request);
} catch (IndexClosedException | IndexNotFoundException | IllegalArgumentException ex) {
addFailure(request, idx, ex);
return true;
Expand Down Expand Up @@ -926,25 +928,24 @@ void executeBulk(
}

/**
* Concrete indices
*
* @opensearch.internal
*/
static class ConcreteIndices {
private final ClusterState state;
* Concrete indices
*
* @opensearch.internal
*/
static final class ConcreteIndices {

private final IndexNameExpressionResolver indexNameExpressionResolver;
private final Map<String, Index> indices = new HashMap<>();

ConcreteIndices(ClusterState state, IndexNameExpressionResolver indexNameExpressionResolver) {
this.state = state;
private ConcreteIndices(IndexNameExpressionResolver indexNameExpressionResolver) {
this.indexNameExpressionResolver = indexNameExpressionResolver;
}

Index getConcreteIndex(String indexOrAlias) {
return indices.get(indexOrAlias);
}

Index resolveIfAbsent(DocWriteRequest<?> request) {
Index resolveIfAbsent(ClusterState state, DocWriteRequest<?> request) {
Index concreteIndex = indices.get(request.index());
if (concreteIndex == null) {
boolean includeDataStreams = request.opType() == DocWriteRequest.OpType.CREATE;
Expand All @@ -957,11 +958,10 @@ Index resolveIfAbsent(DocWriteRequest<?> request) {
includeDataStreams
);
} catch (IndexNotFoundException e) {
if (includeDataStreams == false && e.getMetadataKeys().contains(EXCLUDED_DATA_STREAMS_KEY)) {
if (!includeDataStreams && e.getMetadataKeys().contains(EXCLUDED_DATA_STREAMS_KEY)) {
throw new IllegalArgumentException("only write ops with an op_type of create are allowed in data streams");
} else {
throw e;
}
throw e;
}
indices.put(request.index(), concreteIndex);
}
Expand Down
Loading