Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.exception.QueryErrorCode;
import org.apache.pinot.spi.exception.QueryException;
import org.apache.pinot.spi.metrics.PinotMeter;
import org.apache.pinot.spi.query.QueryExecutionContext;
import org.apache.pinot.spi.query.QueryThreadContext;
import org.apache.pinot.spi.trace.QueryFingerprint;
Expand Down Expand Up @@ -136,6 +137,11 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
protected final long _extraPassiveTimeoutMs;
protected final boolean _enableQueryFingerprinting;

protected final PinotMeter _stagesStartedMeter = BrokerMeter.MSE_STAGES_STARTED.getGlobalMeter();
protected final PinotMeter _stagesFinishedMeter = BrokerMeter.MSE_STAGES_COMPLETED.getGlobalMeter();
protected final PinotMeter _opchainsStartedMeter = BrokerMeter.MSE_OPCHAINS_STARTED.getGlobalMeter();
protected final PinotMeter _opchainsCompletedMeter = BrokerMeter.MSE_OPCHAINS_COMPLETED.getGlobalMeter();
Comment on lines +140 to +143
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The BaseBrokerRequestHandler class already has an instance of BrokerMetrics. We should use that here instead for better testability instead of the global singleton IMO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The attribute in the parent class is actually final and initialized to the value used here (the one returned by BrokerMetrics.get()).

In general, the idea is to use BrokerMetrics.get() whenever possible. We should also keep the meters as attributes, as I'm doing here, to speed up calls (since we don't need to look up the meter each time). In BaseBrokerRequestHandler we use the old pattern, where we have an attribute for BrokerMetrics but look for the metric each time we need to modify it.


public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerId,
BrokerRequestIdGenerator requestIdGenerator, RoutingManager routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
Expand Down Expand Up @@ -581,10 +587,19 @@ private BrokerResponse query(QueryEnvironment.CompiledQuery query, long requestI
return new BrokerResponseNative(QueryErrorCode.EXECUTION_TIMEOUT);
}

int stageCount = dispatchableSubPlan.getQueryStageMap().size();
int opChainCount = dispatchableSubPlan.getQueryStageMap().values().stream()
Comment on lines +590 to +591
Copy link

Copilot AI Dec 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dispatchableSubPlan.getQueryStageMap() is called twice. Consider storing the result in a variable to avoid redundant calls.

Suggested change
int stageCount = dispatchableSubPlan.getQueryStageMap().size();
int opChainCount = dispatchableSubPlan.getQueryStageMap().values().stream()
Map<Integer, DispatchablePlanFragment> queryStageMap = dispatchableSubPlan.getQueryStageMap();
int stageCount = queryStageMap.size();
int opChainCount = queryStageMap.values().stream()

Copilot uses AI. Check for mistakes.
.mapToInt(stage -> stage.getWorkerMetadataList().size())
.sum();

try {
String clientRequestId = extractClientRequestId(query.getSqlNodeAndOptions());
onQueryStart(requestId, clientRequestId, query.getTextQuery());
long executionStartTimeNs = System.nanoTime();

_stagesStartedMeter.mark(stageCount);
_opchainsStartedMeter.mark(opChainCount);

QueryDispatcher.QueryResult queryResults;
try {
queryResults = _queryDispatcher.submitAndReduce(requestContext, dispatchableSubPlan, timer.getRemainingTimeMs(),
Expand All @@ -598,6 +613,8 @@ private BrokerResponse query(QueryEnvironment.CompiledQuery query, long requestI
requestContext.setErrorCode(queryErrorCode);
return new BrokerResponseNative(queryErrorCode, consolidatedMessage);
} finally {
_stagesFinishedMeter.mark(stageCount);
_opchainsCompletedMeter.mark(opChainCount);
onQueryFinish(requestId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.io.IOException;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -54,6 +55,8 @@ public class StatMap<K extends Enum<K> & StatMap.Key> {
private final Map<K, Object> _map;

private static final ConcurrentHashMap<Class<?>, Object[]> KEYS_BY_CLASS = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<Class<?>, Map<String, Object>> KEYS_BY_STRING_BY_CLASS
= new ConcurrentHashMap<>();

public StatMap(Class<K> keyClass) {
_keyClass = keyClass;
Expand Down Expand Up @@ -166,6 +169,25 @@ public Object getAny(K key) {
}
}

/**
* Returns the value associated with the key name.
*
* In general, it is better to use the type-specific getters with the enum key directly, but sometimes it is
* impossible or requires complex to read code (like complex unsafe casts).
*
* @param keyName The name of the key.
* @param defaultValue The default value to return if the key is not found.
* @throws ClassCastException if the value cannot be cast to the same static type as the default value.
*/
public <E> E getUnsafe(String keyName, E defaultValue)
throws ClassCastException {
K key = getKey(keyName);
if (key == null) {
return defaultValue;
}
return (E) getAny(key);
}

/**
* Modifies this object to merge the values of the other object.
*
Expand Down Expand Up @@ -201,11 +223,28 @@ public StatMap<K> merge(StatMap<K> other) {
return this;
}

private K[] keys() {
return (K[]) KEYS_BY_CLASS.computeIfAbsent(_keyClass, k -> k.getEnumConstants());
}

@Nullable
private K getKey(String name) {
Map<String, Object> cachedMap = KEYS_BY_STRING_BY_CLASS.computeIfAbsent(_keyClass, k -> {
K[] keys = (K[]) k.getEnumConstants();
Map<String, Object> mapValue = new HashMap<>();
for (K key : keys) {
mapValue.put(key.name(), key);
}
return mapValue;
});
return (K) cachedMap.get(name);
}

public StatMap<K> merge(DataInput input)
throws IOException {
byte serializedKeys = input.readByte();

K[] keys = (K[]) KEYS_BY_CLASS.computeIfAbsent(_keyClass, k -> k.getEnumConstants());
K[] keys = keys();
for (byte i = 0; i < serializedKeys; i++) {
int ordinal = input.readByte();
K key = keys[ordinal];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import java.util.Map;
import org.apache.pinot.common.Utils;
import org.apache.pinot.spi.exception.QueryErrorCode;
import org.apache.pinot.spi.metrics.PinotMeter;


/**
* Class containing all the metrics exposed by the Pinot broker.
* Class containing all the meters exposed by the Pinot broker.
* This is implemented as a class rather than an enum to allow for dynamic addition of metrics for all QueryErrorCodes.
*/
public class BrokerMeter implements AbstractMetrics.Meter {
Expand Down Expand Up @@ -233,6 +235,11 @@ public class BrokerMeter implements AbstractMetrics.Meter {
*/
public static final BrokerMeter WINDOW_COUNT = create("WINDOW_COUNT", "queries", true);

public static final BrokerMeter MSE_STAGES_STARTED = create("MSE_STAGES_STARTED", "stages", true);
public static final BrokerMeter MSE_STAGES_COMPLETED = create("MSE_STAGES_COMPLETED", "stages", true);
public static final BrokerMeter MSE_OPCHAINS_STARTED = create("MSE_OPCHAINS_STARTED", "opchains", true);
public static final BrokerMeter MSE_OPCHAINS_COMPLETED = create("MSE_OPCHAINS_COMPLETED", "opchains", true);

/**
* How many MSE queries have encountered segments with invalid partitions.
* <p>
Expand Down Expand Up @@ -331,4 +338,8 @@ public static BrokerMeter[] values() {
public static BrokerMeter getQueryErrorMeter(QueryErrorCode queryErrorCode) {
return QUERY_ERROR_CODE_METER_MAP.get(queryErrorCode);
}

public PinotMeter getGlobalMeter() {
return BrokerMetrics.get().getMeteredValue(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.common.metrics;

import org.apache.pinot.common.Utils;
import org.apache.pinot.spi.metrics.PinotMeter;


/**
Expand Down Expand Up @@ -246,7 +247,24 @@ public enum ServerMeter implements AbstractMetrics.Meter {

TRANSFORMATION_ERROR_COUNT("rows", false),
DROPPED_RECORD_COUNT("rows", false),
CORRUPTED_RECORD_COUNT("rows", false);
CORRUPTED_RECORD_COUNT("rows", false),

/// Number of multi-stage execution opchains started.
/// This is equal to the number of stages times the average parallelism
MSE_OPCHAINS_STARTED("opchains", true),
/// Number of multi-stage execution opchains completed.
/// This is equal to the number of stages times the average parallelism
MSE_OPCHAINS_COMPLETED("opchains", true),

/// Total execution time spent in multi-stage execution on CPU in milliseconds.
/// This is equal to the sum of the executionTimeMs reported by the root of all the opchains executed in the server.
MSE_CPU_EXECUTION_TIME_MS("milliseconds", true),
/// Total memory allocated in bytes for multi-stage execution.
/// This is equal to the sum of the allocatedMemoryBytes reported by all the opchains executed in the server.
MSE_MEMORY_ALLOCATED_BYTES("bytes", true),
/// Total number of rows emitted by multi-stage execution.
/// This is equal to the sum of the emittedRows reported by the root of all the opchains executed in the server.
MSE_EMITTED_ROWS("rows", true);

private final String _meterName;
private final String _unit;
Expand Down Expand Up @@ -288,4 +306,8 @@ public boolean isGlobal() {
public String getDescription() {
return _description;
}

public PinotMeter getGlobalMeter() {
return ServerMetrics.get().getMeteredValue(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.datatable.StatMap;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.core.util.trace.TraceRunnable;
import org.apache.pinot.query.runtime.blocks.ErrorMseBlock;
import org.apache.pinot.query.runtime.blocks.MseBlock;
Expand All @@ -46,6 +48,7 @@
import org.apache.pinot.spi.exception.QueryErrorCode;
import org.apache.pinot.spi.exception.QueryException;
import org.apache.pinot.spi.exception.TerminationException;
import org.apache.pinot.spi.metrics.PinotMeter;
import org.apache.pinot.spi.query.QueryExecutionContext;
import org.apache.pinot.spi.query.QueryThreadContext;
import org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner;
Expand All @@ -65,6 +68,7 @@ public class OpChainSchedulerService {
private final Cache<OpChainId, Pair<MultiStageOperator, QueryExecutionContext>> _opChainCache;
private final ReadWriteLock[] _queryLocks;
private final Cache<Long, Boolean> _cancelledQueryCache;
private final Metrics _metrics = new Metrics();

public OpChainSchedulerService(String instanceId, ExecutorService executorService, PinotConfiguration config) {
this(instanceId, executorService, config.getProperty(MultiStageQueryRunner.KEY_OF_OP_STATS_CACHE_SIZE,
Expand Down Expand Up @@ -153,6 +157,7 @@ private void registerInternal(OpChain operatorChain, QueryExecutionContext execu
ListenableFutureTask<Void> listenableFutureTask = ListenableFutureTask.create(new TraceRunnable() {
@Override
public void runJob() {
_metrics.onOpChainStarted();
LOGGER.trace("({}): Executing", operatorChain);
MseBlock result = rootOperator.nextBlock();
while (result.isData()) {
Expand All @@ -174,12 +179,14 @@ public void runJob() {
Futures.addCallback(listenableFutureTask, new FutureCallback<>() {
@Override
public void onSuccess(Void result) {
_metrics.onOpChainFinished(rootOperator);
operatorChain.close();
}

@Override
public void onFailure(Throwable t) {
String logMsg = "Failed to execute operator chain: " + t.getMessage();
_metrics.onOpChainFinished(rootOperator);
if (t instanceof QueryException) {
switch (((QueryException) t).getErrorCode()) {
case UNKNOWN:
Expand Down Expand Up @@ -266,4 +273,28 @@ private int countOperators(MultiStageOperator root) {
private ReadWriteLock getQueryLock(long requestId) {
return _queryLocks[(int) (requestId & QUERY_LOCK_MASK)];
}

private static class Metrics {
private final PinotMeter _startedOpchains = ServerMeter.MSE_OPCHAINS_STARTED.getGlobalMeter();
private final PinotMeter _competedOpchains = ServerMeter.MSE_OPCHAINS_COMPLETED.getGlobalMeter();
private final PinotMeter _emittedRows = ServerMeter.MSE_EMITTED_ROWS.getGlobalMeter();
private final PinotMeter _cpuExecutionTimeMs = ServerMeter.MSE_CPU_EXECUTION_TIME_MS.getGlobalMeter();
private final PinotMeter _memoryAllocatedBytes = ServerMeter.MSE_MEMORY_ALLOCATED_BYTES.getGlobalMeter();

private static final String EMITTED_ROWS = "EMITTED_ROWS";
private static final String EXECUTION_TIME_MS = "EXECUTION_TIME_MS";
private static final String ALLOCATED_MEMORY_BYTES = "ALLOCATED_MEMORY_BYTES";

public void onOpChainStarted() {
_startedOpchains.mark();
}

public void onOpChainFinished(MultiStageOperator rootOperator) {
_competedOpchains.mark();
StatMap<?> operatorStats = rootOperator.copyStatMaps();
_emittedRows.mark(operatorStats.getUnsafe(EMITTED_ROWS, 0L));
_cpuExecutionTimeMs.mark(operatorStats.getUnsafe(EXECUTION_TIME_MS, 0L));
_memoryAllocatedBytes.mark(operatorStats.getUnsafe(ALLOCATED_MEMORY_BYTES, 0L));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ private MseBlock produceAggregatedBlock() {
}

@Override
protected StatMap<?> copyStatMaps() {
public StatMap<StatKey> copyStatMaps() {
return new StatMap<>(_statMap);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ protected void earlyTerminateLeftInput() {
}

@Override
protected StatMap<?> copyStatMaps() {
public StatMap<StatKey> copyStatMaps() {
return new StatMap<>(_statMap);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public MultiStageQueryStats calculateUpstreamStats() {
}

@Override
protected StatMap<?> copyStatMaps() {
public StatMap<StatKey> copyStatMaps() {
return new StatMap<>(_statMap);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public List<MultiStageOperator> getChildOperators() {
return _childOperators;
}

protected StatMap<?> copyStatMaps() {
public StatMap<LiteralValueOperator.StatKey> copyStatMaps() {
return new StatMap<>(_statMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ protected MseBlock getNextBlock() {
}

@Override
protected StatMap<?> copyStatMaps() {
public StatMap<StatKey> copyStatMaps() {
return new StatMap<>(_statMap);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ private void checkTerminateException() {
}

@Override
protected StatMap<?> copyStatMaps() {
public StatMap<StatKey> copyStatMaps() {
return new StatMap<>(_statMap);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ private RowHeapDataBlock constructBlock() {
}

@Override
protected StatMap<?> copyStatMaps() {
public StatMap<StatKey> copyStatMaps() {
return new StatMap<>(_statMap);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ protected MseBlock getNextBlock() {
}

@Override
protected StatMap<?> copyStatMaps() {
public StatMap<StatKey> copyStatMaps() {
return new StatMap<>(_statMap);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ private void sendEos(MseBlock.Eos eosBlockWithoutStats) {
}

@Override
protected StatMap<?> copyStatMaps() {
public StatMap<StatKey> copyStatMaps() {
return new StatMap<>(_statMap);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ protected MultiStageQueryStats calculateUpstreamStats() {
.orElse(MultiStageQueryStats.emptyStats(_context.getStageId()));
}

protected abstract StatMap<?> copyStatMaps();
public abstract StatMap<?> copyStatMaps();

// TODO: Ideally close() call should finish within request deadline.
// TODO: Consider passing deadline as part of the API.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ protected MseBlock getNextBlock() {
}

@Override
protected StatMap<?> copyStatMaps() {
public StatMap<StatKey> copyStatMaps() {
return new StatMap<>(_statMap);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ protected MseBlock getNextBlock() {
}

@Override
protected StatMap<?> copyStatMaps() {
public StatMap<StatKey> copyStatMaps() {
return new StatMap<>(_statMap);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ private Object[] appendElements(Object[] inputRow, List<Object> elements, int or
}

@Override
protected StatMap<?> copyStatMaps() {
public StatMap<StatKey> copyStatMaps() {
return new StatMap<>(_statMap);
}

Expand Down
Loading
Loading