diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index 68bee89bf322..e106b192b030 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -92,6 +92,7 @@ import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.exception.QueryErrorCode; import org.apache.pinot.spi.exception.QueryException; +import org.apache.pinot.spi.metrics.PinotMeter; import org.apache.pinot.spi.query.QueryExecutionContext; import org.apache.pinot.spi.query.QueryThreadContext; import org.apache.pinot.spi.trace.QueryFingerprint; @@ -136,6 +137,11 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { protected final long _extraPassiveTimeoutMs; protected final boolean _enableQueryFingerprinting; + protected final PinotMeter _stagesStartedMeter = BrokerMeter.MSE_STAGES_STARTED.getGlobalMeter(); + protected final PinotMeter _stagesFinishedMeter = BrokerMeter.MSE_STAGES_COMPLETED.getGlobalMeter(); + protected final PinotMeter _opchainsStartedMeter = BrokerMeter.MSE_OPCHAINS_STARTED.getGlobalMeter(); + protected final PinotMeter _opchainsCompletedMeter = BrokerMeter.MSE_OPCHAINS_COMPLETED.getGlobalMeter(); + public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerId, BrokerRequestIdGenerator requestIdGenerator, RoutingManager routingManager, AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache, @@ -581,10 +587,19 @@ private BrokerResponse query(QueryEnvironment.CompiledQuery query, long requestI return new BrokerResponseNative(QueryErrorCode.EXECUTION_TIMEOUT); } + int stageCount = dispatchableSubPlan.getQueryStageMap().size(); + int opChainCount = dispatchableSubPlan.getQueryStageMap().values().stream() + .mapToInt(stage -> stage.getWorkerMetadataList().size()) + .sum(); + try { String clientRequestId = extractClientRequestId(query.getSqlNodeAndOptions()); onQueryStart(requestId, clientRequestId, query.getTextQuery()); long executionStartTimeNs = System.nanoTime(); + + _stagesStartedMeter.mark(stageCount); + _opchainsStartedMeter.mark(opChainCount); + QueryDispatcher.QueryResult queryResults; try { queryResults = _queryDispatcher.submitAndReduce(requestContext, dispatchableSubPlan, timer.getRemainingTimeMs(), @@ -598,6 +613,8 @@ private BrokerResponse query(QueryEnvironment.CompiledQuery query, long requestI requestContext.setErrorCode(queryErrorCode); return new BrokerResponseNative(queryErrorCode, consolidatedMessage); } finally { + _stagesFinishedMeter.mark(stageCount); + _opchainsCompletedMeter.mark(opChainCount); onQueryFinish(requestId); } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datatable/StatMap.java b/pinot-common/src/main/java/org/apache/pinot/common/datatable/StatMap.java index b64b281d5f3b..9754a0a4dffe 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/datatable/StatMap.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/datatable/StatMap.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.util.Collections; import java.util.EnumMap; +import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; @@ -54,6 +55,8 @@ public class StatMap & StatMap.Key> { private final Map _map; private static final ConcurrentHashMap, Object[]> KEYS_BY_CLASS = new ConcurrentHashMap<>(); + private static final ConcurrentHashMap, Map> KEYS_BY_STRING_BY_CLASS + = new ConcurrentHashMap<>(); public StatMap(Class keyClass) { _keyClass = keyClass; @@ -166,6 +169,25 @@ public Object getAny(K key) { } } + /** + * Returns the value associated with the key name. + * + * In general, it is better to use the type-specific getters with the enum key directly, but sometimes it is + * impossible or requires complex to read code (like complex unsafe casts). + * + * @param keyName The name of the key. + * @param defaultValue The default value to return if the key is not found. + * @throws ClassCastException if the value cannot be cast to the same static type as the default value. + */ + public E getUnsafe(String keyName, E defaultValue) + throws ClassCastException { + K key = getKey(keyName); + if (key == null) { + return defaultValue; + } + return (E) getAny(key); + } + /** * Modifies this object to merge the values of the other object. * @@ -201,11 +223,28 @@ public StatMap merge(StatMap other) { return this; } + private K[] keys() { + return (K[]) KEYS_BY_CLASS.computeIfAbsent(_keyClass, k -> k.getEnumConstants()); + } + + @Nullable + private K getKey(String name) { + Map cachedMap = KEYS_BY_STRING_BY_CLASS.computeIfAbsent(_keyClass, k -> { + K[] keys = (K[]) k.getEnumConstants(); + Map mapValue = new HashMap<>(); + for (K key : keys) { + mapValue.put(key.name(), key); + } + return mapValue; + }); + return (K) cachedMap.get(name); + } + public StatMap merge(DataInput input) throws IOException { byte serializedKeys = input.readByte(); - K[] keys = (K[]) KEYS_BY_CLASS.computeIfAbsent(_keyClass, k -> k.getEnumConstants()); + K[] keys = keys(); for (byte i = 0; i < serializedKeys; i++) { int ordinal = input.readByte(); K key = keys[ordinal]; diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java index 93c4270eb7c4..95ab32152149 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java @@ -24,9 +24,11 @@ import java.util.Map; import org.apache.pinot.common.Utils; import org.apache.pinot.spi.exception.QueryErrorCode; +import org.apache.pinot.spi.metrics.PinotMeter; + /** - * Class containing all the metrics exposed by the Pinot broker. + * Class containing all the meters exposed by the Pinot broker. * This is implemented as a class rather than an enum to allow for dynamic addition of metrics for all QueryErrorCodes. */ public class BrokerMeter implements AbstractMetrics.Meter { @@ -233,6 +235,11 @@ public class BrokerMeter implements AbstractMetrics.Meter { */ public static final BrokerMeter WINDOW_COUNT = create("WINDOW_COUNT", "queries", true); + public static final BrokerMeter MSE_STAGES_STARTED = create("MSE_STAGES_STARTED", "stages", true); + public static final BrokerMeter MSE_STAGES_COMPLETED = create("MSE_STAGES_COMPLETED", "stages", true); + public static final BrokerMeter MSE_OPCHAINS_STARTED = create("MSE_OPCHAINS_STARTED", "opchains", true); + public static final BrokerMeter MSE_OPCHAINS_COMPLETED = create("MSE_OPCHAINS_COMPLETED", "opchains", true); + /** * How many MSE queries have encountered segments with invalid partitions. *

@@ -331,4 +338,8 @@ public static BrokerMeter[] values() { public static BrokerMeter getQueryErrorMeter(QueryErrorCode queryErrorCode) { return QUERY_ERROR_CODE_METER_MAP.get(queryErrorCode); } + + public PinotMeter getGlobalMeter() { + return BrokerMetrics.get().getMeteredValue(this); + } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java index f0b304feb699..41ee737381f5 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java @@ -19,6 +19,7 @@ package org.apache.pinot.common.metrics; import org.apache.pinot.common.Utils; +import org.apache.pinot.spi.metrics.PinotMeter; /** @@ -246,7 +247,24 @@ public enum ServerMeter implements AbstractMetrics.Meter { TRANSFORMATION_ERROR_COUNT("rows", false), DROPPED_RECORD_COUNT("rows", false), - CORRUPTED_RECORD_COUNT("rows", false); + CORRUPTED_RECORD_COUNT("rows", false), + + /// Number of multi-stage execution opchains started. + /// This is equal to the number of stages times the average parallelism + MSE_OPCHAINS_STARTED("opchains", true), + /// Number of multi-stage execution opchains completed. + /// This is equal to the number of stages times the average parallelism + MSE_OPCHAINS_COMPLETED("opchains", true), + + /// Total execution time spent in multi-stage execution on CPU in milliseconds. + /// This is equal to the sum of the executionTimeMs reported by the root of all the opchains executed in the server. + MSE_CPU_EXECUTION_TIME_MS("milliseconds", true), + /// Total memory allocated in bytes for multi-stage execution. + /// This is equal to the sum of the allocatedMemoryBytes reported by all the opchains executed in the server. + MSE_MEMORY_ALLOCATED_BYTES("bytes", true), + /// Total number of rows emitted by multi-stage execution. + /// This is equal to the sum of the emittedRows reported by the root of all the opchains executed in the server. + MSE_EMITTED_ROWS("rows", true); private final String _meterName; private final String _unit; @@ -288,4 +306,8 @@ public boolean isGlobal() { public String getDescription() { return _description; } + + public PinotMeter getGlobalMeter() { + return ServerMetrics.get().getMeteredValue(this); + } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java index f258cd4163d1..599d052cf24e 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java @@ -34,6 +34,8 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pinot.common.datatable.StatMap; +import org.apache.pinot.common.metrics.ServerMeter; import org.apache.pinot.core.util.trace.TraceRunnable; import org.apache.pinot.query.runtime.blocks.ErrorMseBlock; import org.apache.pinot.query.runtime.blocks.MseBlock; @@ -46,6 +48,7 @@ import org.apache.pinot.spi.exception.QueryErrorCode; import org.apache.pinot.spi.exception.QueryException; import org.apache.pinot.spi.exception.TerminationException; +import org.apache.pinot.spi.metrics.PinotMeter; import org.apache.pinot.spi.query.QueryExecutionContext; import org.apache.pinot.spi.query.QueryThreadContext; import org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner; @@ -65,6 +68,7 @@ public class OpChainSchedulerService { private final Cache> _opChainCache; private final ReadWriteLock[] _queryLocks; private final Cache _cancelledQueryCache; + private final Metrics _metrics = new Metrics(); public OpChainSchedulerService(String instanceId, ExecutorService executorService, PinotConfiguration config) { this(instanceId, executorService, config.getProperty(MultiStageQueryRunner.KEY_OF_OP_STATS_CACHE_SIZE, @@ -153,6 +157,7 @@ private void registerInternal(OpChain operatorChain, QueryExecutionContext execu ListenableFutureTask listenableFutureTask = ListenableFutureTask.create(new TraceRunnable() { @Override public void runJob() { + _metrics.onOpChainStarted(); LOGGER.trace("({}): Executing", operatorChain); MseBlock result = rootOperator.nextBlock(); while (result.isData()) { @@ -174,12 +179,14 @@ public void runJob() { Futures.addCallback(listenableFutureTask, new FutureCallback<>() { @Override public void onSuccess(Void result) { + _metrics.onOpChainFinished(rootOperator); operatorChain.close(); } @Override public void onFailure(Throwable t) { String logMsg = "Failed to execute operator chain: " + t.getMessage(); + _metrics.onOpChainFinished(rootOperator); if (t instanceof QueryException) { switch (((QueryException) t).getErrorCode()) { case UNKNOWN: @@ -266,4 +273,28 @@ private int countOperators(MultiStageOperator root) { private ReadWriteLock getQueryLock(long requestId) { return _queryLocks[(int) (requestId & QUERY_LOCK_MASK)]; } + + private static class Metrics { + private final PinotMeter _startedOpchains = ServerMeter.MSE_OPCHAINS_STARTED.getGlobalMeter(); + private final PinotMeter _competedOpchains = ServerMeter.MSE_OPCHAINS_COMPLETED.getGlobalMeter(); + private final PinotMeter _emittedRows = ServerMeter.MSE_EMITTED_ROWS.getGlobalMeter(); + private final PinotMeter _cpuExecutionTimeMs = ServerMeter.MSE_CPU_EXECUTION_TIME_MS.getGlobalMeter(); + private final PinotMeter _memoryAllocatedBytes = ServerMeter.MSE_MEMORY_ALLOCATED_BYTES.getGlobalMeter(); + + private static final String EMITTED_ROWS = "EMITTED_ROWS"; + private static final String EXECUTION_TIME_MS = "EXECUTION_TIME_MS"; + private static final String ALLOCATED_MEMORY_BYTES = "ALLOCATED_MEMORY_BYTES"; + + public void onOpChainStarted() { + _startedOpchains.mark(); + } + + public void onOpChainFinished(MultiStageOperator rootOperator) { + _competedOpchains.mark(); + StatMap operatorStats = rootOperator.copyStatMaps(); + _emittedRows.mark(operatorStats.getUnsafe(EMITTED_ROWS, 0L)); + _cpuExecutionTimeMs.mark(operatorStats.getUnsafe(EXECUTION_TIME_MS, 0L)); + _memoryAllocatedBytes.mark(operatorStats.getUnsafe(ALLOCATED_MEMORY_BYTES, 0L)); + } + } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java index dfc1cc3600ab..0a792c411200 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java @@ -256,7 +256,7 @@ private MseBlock produceAggregatedBlock() { } @Override - protected StatMap copyStatMaps() { + public StatMap copyStatMaps() { return new StatMap<>(_statMap); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java index 3b459b51bcaf..17c5c6b8787d 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java @@ -330,7 +330,7 @@ protected void earlyTerminateLeftInput() { } @Override - protected StatMap copyStatMaps() { + public StatMap copyStatMaps() { return new StatMap<>(_statMap); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java index 250395042a0e..54cce0d78cfa 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java @@ -121,7 +121,7 @@ public MultiStageQueryStats calculateUpstreamStats() { } @Override - protected StatMap copyStatMaps() { + public StatMap copyStatMaps() { return new StatMap<>(_statMap); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/ErrorOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/ErrorOperator.java index 926fec585edb..e8ec717fbc48 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/ErrorOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/ErrorOperator.java @@ -98,7 +98,7 @@ public List getChildOperators() { return _childOperators; } - protected StatMap copyStatMaps() { + public StatMap copyStatMaps() { return new StatMap<>(_statMap); } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java index e98b9e2b245f..bc4b49d2f7bd 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java @@ -119,7 +119,7 @@ protected MseBlock getNextBlock() { } @Override - protected StatMap copyStatMaps() { + public StatMap copyStatMaps() { return new StatMap<>(_statMap); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java index 4fd8fb8dc508..92d64f55bcc4 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java @@ -262,7 +262,7 @@ private void checkTerminateException() { } @Override - protected StatMap copyStatMaps() { + public StatMap copyStatMaps() { return new StatMap<>(_statMap); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java index a2b008277490..23ae0889a169 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java @@ -100,7 +100,7 @@ private RowHeapDataBlock constructBlock() { } @Override - protected StatMap copyStatMaps() { + public StatMap copyStatMaps() { return new StatMap<>(_statMap); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LookupJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LookupJoinOperator.java index 0e34f6573c8b..02024b8efa8a 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LookupJoinOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LookupJoinOperator.java @@ -145,7 +145,7 @@ protected MseBlock getNextBlock() { } @Override - protected StatMap copyStatMaps() { + public StatMap copyStatMaps() { return new StatMap<>(_statMap); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java index e442eab1ca9d..05bfe47dda65 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java @@ -275,7 +275,7 @@ private void sendEos(MseBlock.Eos eosBlockWithoutStats) { } @Override - protected StatMap copyStatMaps() { + public StatMap copyStatMaps() { return new StatMap<>(_statMap); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java index 656420e75819..1665fe85b7b0 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java @@ -177,7 +177,7 @@ protected MultiStageQueryStats calculateUpstreamStats() { .orElse(MultiStageQueryStats.emptyStats(_context.getStageId())); } - protected abstract StatMap copyStatMaps(); + public abstract StatMap copyStatMaps(); // TODO: Ideally close() call should finish within request deadline. // TODO: Consider passing deadline as part of the API. diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java index cb76204fb542..894e6f8a808b 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java @@ -133,7 +133,7 @@ protected MseBlock getNextBlock() { } @Override - protected StatMap copyStatMaps() { + public StatMap copyStatMaps() { return new StatMap<>(_statMap); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java index 4d919ffe7d00..cdb5f803d170 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java @@ -115,7 +115,7 @@ protected MseBlock getNextBlock() { } @Override - protected StatMap copyStatMaps() { + public StatMap copyStatMaps() { return new StatMap<>(_statMap); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/UnnestOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/UnnestOperator.java index 1aa84636a909..0594e82a72c5 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/UnnestOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/UnnestOperator.java @@ -266,7 +266,7 @@ private Object[] appendElements(Object[] inputRow, List elements, int or } @Override - protected StatMap copyStatMaps() { + public StatMap copyStatMaps() { return new StatMap<>(_statMap); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java index be7736e91e76..40e077751dbc 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java @@ -283,7 +283,7 @@ private MseBlock computeBlocks() { } @Override - protected StatMap copyStatMaps() { + public StatMap copyStatMaps() { return new StatMap<>(_statMap); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/set/SetOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/set/SetOperator.java index 6e870835ef27..070a550e2c51 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/set/SetOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/set/SetOperator.java @@ -56,7 +56,7 @@ public List getChildOperators() { } @Override - protected StatMap copyStatMaps() { + public StatMap copyStatMaps() { return new StatMap<>(_statMap); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java index 4ed8dfe3d1ca..5732ff7c8d7c 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java @@ -148,7 +148,7 @@ protected MultiStageQueryStats calculateUpstreamStats() { } @Override - protected StatMap copyStatMaps() { + public StatMap copyStatMaps() { return new StatMap<>(_statMap); } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java index a0e4772ea1c1..abda6ceecc4a 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java @@ -24,12 +24,14 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import org.apache.pinot.common.datatable.StatMap; import org.apache.pinot.common.utils.NamedThreadFactory; import org.apache.pinot.query.mailbox.MailboxService; import org.apache.pinot.query.routing.StageMetadata; import org.apache.pinot.query.routing.WorkerMetadata; import org.apache.pinot.query.runtime.blocks.ErrorMseBlock; import org.apache.pinot.query.runtime.blocks.SuccessMseBlock; +import org.apache.pinot.query.runtime.operator.MailboxSendOperator; import org.apache.pinot.query.runtime.operator.MultiStageOperator; import org.apache.pinot.query.runtime.operator.OpChain; import org.apache.pinot.query.runtime.plan.MultiStageQueryStats; @@ -74,6 +76,7 @@ public void afterClass() public void beforeMethod() { _operatorA = Mockito.mock(MultiStageOperator.class); clearInvocations(_operatorA); + Mockito.when(_operatorA.copyStatMaps()).thenAnswer(inv -> new StatMap<>(MailboxSendOperator.StatKey.class)); } private OpChain getChain(MultiStageOperator operator) { diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/BlockListMultiStageOperator.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/BlockListMultiStageOperator.java index 6cef4969bb66..42422a5220c1 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/BlockListMultiStageOperator.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/BlockListMultiStageOperator.java @@ -84,7 +84,7 @@ public List getChildOperators() { } @Override - protected StatMap copyStatMaps() { + public StatMap copyStatMaps() { return new StatMap<>(_statMap); } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java index 7d5a89baed07..c9529eca2109 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java @@ -186,7 +186,7 @@ protected MseBlock getNextBlock() { } @Override - protected StatMap copyStatMaps() { + public StatMap copyStatMaps() { return new StatMap<>(_statMap); }