diff --git a/app/vtinsert/insertutil/index_helper.go b/app/vtinsert/insertutil/index_helper.go index 1834ca9d1..c45bf19f6 100644 --- a/app/vtinsert/insertutil/index_helper.go +++ b/app/vtinsert/insertutil/index_helper.go @@ -22,6 +22,7 @@ type indexEntry struct { tenantID logstorage.TenantID startTimeNano int64 endTimeNano int64 + hasRootSpan bool } type indexWorker struct { @@ -48,13 +49,13 @@ var ( // pushIndexToQueue organize index data (from LogMessageProcessor interface or InsertRowProcessor interface) // and push it to the queue. -func pushIndexToQueue(tenantID logstorage.TenantID, traceID string, startTime, endTime int64) bool { +func pushIndexToQueue(tenantID logstorage.TenantID, traceID string, startTime, endTime int64, isRootSpan bool) bool { select { case <-stopCh: // during stop, no data should be pushed to the queue anymore. return false default: - mustPushIndex(tenantID, traceID, startTime, endTime) + mustPushIndex(tenantID, traceID, startTime, endTime, isRootSpan) } return true } @@ -62,7 +63,7 @@ func pushIndexToQueue(tenantID logstorage.TenantID, traceID string, startTime, e // mustPushIndex compose an (or update an existing) indexEntry with tenantID, startTime and endTime for a trace, // and put it to the traceIDIndex map. // The indexEntry should indicate the real min(startTime) and max(endTime) of a trace, and be flushed to disk later. -func mustPushIndex(tenantID logstorage.TenantID, traceID string, startTime, endTime int64) { +func mustPushIndex(tenantID logstorage.TenantID, traceID string, startTime, endTime int64, isRootSpan bool) { tb := [32]byte{} copy(tb[:], traceID) @@ -78,6 +79,7 @@ func mustPushIndex(tenantID logstorage.TenantID, traceID string, startTime, endT if ok { idxEntry.startTimeNano = min(startTime, idxEntry.startTimeNano) idxEntry.endTimeNano = max(endTime, idxEntry.endTimeNano) + idxEntry.hasRootSpan = idxEntry.hasRootSpan || isRootSpan worker.traceIDIndexMapCur[tb] = idxEntry return } @@ -86,6 +88,7 @@ func mustPushIndex(tenantID logstorage.TenantID, traceID string, startTime, endT if ok { idxEntry.startTimeNano = min(startTime, idxEntry.startTimeNano) idxEntry.endTimeNano = max(endTime, idxEntry.endTimeNano) + idxEntry.hasRootSpan = idxEntry.hasRootSpan || isRootSpan worker.traceIDIndexMapPrev[tb] = idxEntry return } @@ -95,6 +98,7 @@ func mustPushIndex(tenantID logstorage.TenantID, traceID string, startTime, endT idxEntry.tenantID = tenantID idxEntry.startTimeNano = startTime idxEntry.endTimeNano = endTime + idxEntry.hasRootSpan = isRootSpan worker.traceIDIndexMapCur[tb] = idxEntry } @@ -179,6 +183,10 @@ func (w *indexWorker) flushIndexInMap(tb [32]byte, idxEntry indexEntry) bool { startTimestamp := idxEntry.startTimeNano endTimestamp := idxEntry.endTimeNano + hasRootSpan := "0" + if idxEntry.hasRootSpan { + hasRootSpan = "1" + } lmp.AddRow(startTimestamp, // fields []logstorage.Field{ @@ -188,6 +196,7 @@ func (w *indexWorker) flushIndexInMap(tb [32]byte, idxEntry indexEntry) bool { {Name: otelpb.TraceIDIndexStartTimeFieldName, Value: strconv.FormatInt(startTimestamp, 10)}, {Name: otelpb.TraceIDIndexEndTimeFieldName, Value: strconv.FormatInt(endTimestamp, 10)}, {Name: otelpb.TraceIDIndexDuration, Value: strconv.FormatInt(endTimestamp-startTimestamp, 10)}, + {Name: otelpb.TraceIDIndexHasRootSpan, Value: hasRootSpan}, }, 1, ) diff --git a/app/vtinsert/insertutil/trace_processor.go b/app/vtinsert/insertutil/trace_processor.go index f01184796..ea71a3355 100644 --- a/app/vtinsert/insertutil/trace_processor.go +++ b/app/vtinsert/insertutil/trace_processor.go @@ -76,9 +76,11 @@ func (tsp *traceSpanProcessor) MustClose() { // Each item in the queue will be popped after certain interval, and carries the min(startTimeNano), max(endTimeNano) of this trace ID. func (tsp *traceSpanProcessor) pushTraceToIndexQueue(tenant logstorage.TenantID, fields []logstorage.Field) bool { var ( - traceID string - startTime, endTime int64 - err error + traceID string + parentSpanID string + startTime int64 + endTime int64 + err error ) i := len(fields) - 1 @@ -94,26 +96,25 @@ func (tsp *traceSpanProcessor) pushTraceToIndexQueue(tenant logstorage.TenantID, return false } - // find endTimeNano of the span in reverse order, it should be right before trace ID field. + // find endTimeNano, startTimeNano, and parentSpanID of the span in reverse order. for i = i - 1; i >= 0; i-- { - if fields[i].Name == otelpb.EndTimeUnixNanoField { + switch fields[i].Name { + case otelpb.EndTimeUnixNanoField: endTime, err = strconv.ParseInt(fields[i].Value, 10, 64) if err != nil { logger.Errorf("cannot parse endTime %s for traceID %q: %v", fields[i].Value, traceID, err) return false } - break - } - } - - // find startTimeNano of the span in reverse order, it should be right before endTimeNano field. - for i = i - 1; i >= 0; i-- { - if fields[i].Name == otelpb.StartTimeUnixNanoField { + case otelpb.StartTimeUnixNanoField: startTime, err = strconv.ParseInt(fields[i].Value, 10, 64) if err != nil { logger.Errorf("cannot parse startTime %s for traceID %q: %v", fields[i].Value, traceID, err) return false } + case otelpb.ParentSpanIDField: + parentSpanID = fields[i].Value + } + if startTime != 0 && endTime != 0 { break } } @@ -126,7 +127,7 @@ func (tsp *traceSpanProcessor) pushTraceToIndexQueue(tenant logstorage.TenantID, endTime = time.Now().UnixNano() } - return pushIndexToQueue(tenant, traceID, startTime, endTime) + return pushIndexToQueue(tenant, traceID, startTime, endTime, parentSpanID == "") } // The following methods are for native data ingestion protocol. They must be called from `internalinsert`. @@ -147,9 +148,11 @@ func (tsp *traceSpanProcessor) AddInsertRow(r *logstorage.InsertRow) { // Each item in the queue will be popped after certain interval, and carries the min(startTimeNano), max(endTimeNano) of this trace ID. func (tsp *traceSpanProcessor) pushNativeRowToIndexQueue(r *logstorage.InsertRow) bool { var ( - traceID string - startTime, endTime int64 - err error + traceID string + parentSpanID string + startTime int64 + endTime int64 + err error ) i := len(r.Fields) - 1 @@ -166,26 +169,25 @@ func (tsp *traceSpanProcessor) pushNativeRowToIndexQueue(r *logstorage.InsertRow return false } - // find endTimeNano of the span in reverse order, it should be right before trace ID field. + // find endTimeNano, startTimeNano, and parentSpanID in reverse order. for i = i - 1; i >= 0; i-- { - if r.Fields[i].Name == otelpb.EndTimeUnixNanoField { + switch r.Fields[i].Name { + case otelpb.EndTimeUnixNanoField: endTime, err = strconv.ParseInt(r.Fields[i].Value, 10, 64) if err != nil { logger.Errorf("cannot parse endTime %s for traceID %q: %v", r.Fields[i].Value, traceID, err) return false } - break - } - } - - // find startTimeNano of the span in reverse order, it should be right before endTimeNano field. - for i = i - 1; i >= 0; i-- { - if r.Fields[i].Name == otelpb.StartTimeUnixNanoField { + case otelpb.StartTimeUnixNanoField: startTime, err = strconv.ParseInt(r.Fields[i].Value, 10, 64) if err != nil { logger.Errorf("cannot parse startTime %s for traceID %q: %v", r.Fields[i].Value, traceID, err) return false } + case otelpb.ParentSpanIDField: + parentSpanID = r.Fields[i].Value + } + if startTime != 0 && endTime != 0 { break } } @@ -198,5 +200,5 @@ func (tsp *traceSpanProcessor) pushNativeRowToIndexQueue(r *logstorage.InsertRow endTime = time.Now().UnixNano() } - return pushIndexToQueue(r.TenantID, traceID, startTime, endTime) + return pushIndexToQueue(r.TenantID, traceID, startTime, endTime, parentSpanID == "") } diff --git a/app/vtselect/traces/tempo/metrics_handler.go b/app/vtselect/traces/tempo/metrics_handler.go new file mode 100644 index 000000000..50a639d27 --- /dev/null +++ b/app/vtselect/traces/tempo/metrics_handler.go @@ -0,0 +1,920 @@ +package tempo + +import ( + "context" + "encoding/json" + "fmt" + "math" + "net/http" + "slices" + "sort" + "strconv" + "strings" + "sync" + "time" + + "github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil" + + "github.com/VictoriaMetrics/VictoriaTraces/app/vtselect/traces/tracecommon" + "github.com/VictoriaMetrics/VictoriaTraces/app/vtstorage" + otelpb "github.com/VictoriaMetrics/VictoriaTraces/lib/protoparser/opentelemetry/pb" + "github.com/VictoriaMetrics/VictoriaTraces/lib/traceql" +) + +type metricsQueryRangeParam struct { + q string + start time.Time + end time.Time + step int64 // nanoseconds +} + +// processMetricsQueryRangeRequest handles the Tempo /api/metrics/query_range API request. +func processMetricsQueryRangeRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) { + cp, err := tracecommon.GetCommonParams(r) + if err != nil { + httpserver.Errorf(w, r, "incorrect query params: %s", err) + return + } + + params, err := parseMetricsQueryRangeParams(r) + if err != nil { + httpserver.Errorf(w, r, "incorrect query params: %s", err) + return + } + + translation, err := translateMetricsQuery(params.q, params.end.UnixNano()) + if err != nil { + httpserver.Errorf(w, r, "cannot translate metrics query: %s", err) + return + } + + var allSeries []tempoMetricsSeries + + if translation.isCompare { + allSeries, err = executeCompareQuery(ctx, cp, translation, params) + if err != nil { + httpserver.Errorf(w, r, "cannot execute compare query: %s", err) + return + } + } else { + allSeries, err = executeStatsQuery(ctx, cp, translation.baseQuery, translation.byFields, params) + if err != nil { + httpserver.Errorf(w, r, "cannot execute query: %s", err) + return + } + + // Collect exemplars — sample trace IDs for clickable links in Grafana. + exemplars, exemplarErr := collectExemplars(ctx, cp, translation.baseFilter, params.start.UnixNano(), params.end.UnixNano(), params.step, defaultMaxExemplars) + if exemplarErr == nil && len(exemplars) > 0 { + attachExemplarsToSeries(allSeries, exemplars) + } + } + + w.Header().Set("Content-Type", "application/json") + WriteMetricsQueryRangeResponse(w, allSeries) +} + +// Fields to exclude from compare attribute discovery. +var compareExcludedFields = map[string]bool{ + otelpb.TraceIDField: true, + otelpb.SpanIDField: true, + otelpb.ParentSpanIDField: true, + otelpb.StartTimeUnixNanoField: true, + otelpb.EndTimeUnixNanoField: true, + otelpb.FlagsField: true, + otelpb.TraceStateField: true, + otelpb.DroppedAttributesCountField: true, + otelpb.DroppedEventsCountField: true, + otelpb.DroppedLinksCountField: true, + otelpb.DurationField: true, + // internal index fields + otelpb.TraceIDIndexStreamName: true, + otelpb.TraceIDIndexFieldName: true, + otelpb.TraceIDIndexStartTimeFieldName: true, + otelpb.TraceIDIndexEndTimeFieldName: true, + // service graph fields + otelpb.ServiceGraphStreamName: true, + otelpb.ServiceGraphParentFieldName: true, + otelpb.ServiceGraphChildFieldName: true, + otelpb.ServiceGraphCallCountFieldName: true, + // VictoriaLogs internals + "_msg": true, "_time": true, "_stream": true, "_stream_id": true, +} + +// compareAttrResult holds per-attribute counts for baseline and selection. +type compareAttrResult struct { + attrName string // VT field name + baseline map[string]map[int64]float64 // value → timestamp → count + selection map[string]map[int64]float64 +} + +// executeCompareQuery discovers attributes and runs per-attribute count queries for compare(). +func executeCompareQuery(ctx context.Context, cp *tracecommon.CommonParams, t *metricsQueryTranslation, params *metricsQueryRangeParam) ([]tempoMetricsSeries, error) { + // Step 1: Discover attributes via GetFieldNames (it handles discovery internally). + q, err := logstorage.ParseQueryAtTimestamp(t.baseFilter, params.end.UnixNano()) + if err != nil { + return nil, fmt.Errorf("cannot parse field_names query: %w", err) + } + q.AddTimeFilter(params.start.UnixNano(), params.end.UnixNano()) + + cpDiscover := *cp + cpDiscover.Query = q + qctx := cpDiscover.NewQueryContext(ctx) + fieldNames, err := vtstorage.GetFieldNames(qctx) + cpDiscover.UpdatePerQueryStatsMetrics() + if err != nil { + return nil, fmt.Errorf("cannot discover field names: %w", err) + } + + // Step 2: Filter excluded fields. + var attrs []string + for _, fn := range fieldNames { + if compareExcludedFields[fn.Value] { + continue + } + // Skip event/link sub-fields. + if strings.HasPrefix(fn.Value, otelpb.EventPrefix) || strings.HasPrefix(fn.Value, otelpb.LinkPrefix) { + continue + } + attrs = append(attrs, fn.Value) + } + + if len(attrs) == 0 { + return nil, nil + } + + // Step 3: Per-attribute parallel queries. + results := make([]compareAttrResult, len(attrs)) + var wg sync.WaitGroup + sem := make(chan struct{}, 16) // concurrency limit + var queryErr error + var queryErrMu sync.Mutex + + // Build the selection filter (base AND compare filter). + selFilter := t.baseFilter + if t.compareFilter != "" && t.compareFilter != "*" { + if selFilter == "*" { + selFilter = t.compareFilter + } else { + selFilter = selFilter + " AND " + t.compareFilter + } + } + + // Determine selection time range. + selStartNs := params.start.UnixNano() + selEndNs := params.end.UnixNano() + if t.selectionStartNs > 0 && t.selectionEndNs > 0 { + selStartNs = t.selectionStartNs + selEndNs = t.selectionEndNs + } + + for i, attr := range attrs { + wg.Add(1) + go func(i int, attr string) { + defer wg.Done() + sem <- struct{}{} + defer func() { <-sem }() + + quotedAttr := quoteLogsQLField(attr) + ar := compareAttrResult{ + attrName: attr, + baseline: make(map[string]map[int64]float64), + selection: make(map[string]map[int64]float64), + } + + // Baseline: count per value over full time range. + baseQ := t.baseFilter + " | stats by (" + quotedAttr + ") count() as value" + baseCounts, err := runCountQuery(ctx, cp, baseQ, attr, params.start.UnixNano(), params.end.UnixNano(), params.step) + if err != nil { + queryErrMu.Lock() + queryErr = err + queryErrMu.Unlock() + return + } + ar.baseline = baseCounts + + // Selection: count per value over selection window. + selQ := selFilter + " | stats by (" + quotedAttr + ") count() as value" + selCounts, err := runCountQuery(ctx, cp, selQ, attr, selStartNs, selEndNs, params.step) + if err != nil { + queryErrMu.Lock() + queryErr = err + queryErrMu.Unlock() + return + } + ar.selection = selCounts + + results[i] = ar + }(i, attr) + } + wg.Wait() + + if queryErr != nil { + return nil, queryErr + } + + // Step 4: Build series with topN. + return buildCompareSeries(results, t.topN), nil +} + +// runCountQuery runs a `stats by (attr) count()` query and returns value → timestamp → count. +func runCountQuery(ctx context.Context, cp *tracecommon.CommonParams, logsQLStr, attrName string, startNs, endNs, stepNs int64) (map[string]map[int64]float64, error) { + q, err := logstorage.ParseQueryAtTimestamp(logsQLStr, endNs) + if err != nil { + return nil, fmt.Errorf("cannot parse query [%s]: %w", logsQLStr, err) + } + q.AddTimeFilter(startNs, endNs) + + labelFields, err := q.GetStatsLabelsAddGroupingByTime(stepNs, 0) + if err != nil { + return nil, fmt.Errorf("cannot prepare stats query: %w", err) + } + + counts := make(map[string]map[int64]float64) + var mu sync.Mutex + + writeBlock := func(_ uint, db *logstorage.DataBlock) { + rowsCount := db.RowsCount() + columns := db.GetColumns(false) + + for i := range rowsCount { + ts := q.GetTimestamp() + var attrValue string + + for _, c := range columns { + if c.Name == "_time" { + nsec, ok := logstorage.TryParseTimestampRFC3339Nano(c.Values[i]) + if ok { + ts = nsec + } + continue + } + if slices.Contains(labelFields, c.Name) && c.Name == attrName { + attrValue = strings.Clone(c.Values[i]) + } + } + + for _, c := range columns { + if slices.Contains(labelFields, c.Name) || c.Name == "_time" { + continue + } + v, _ := strconv.ParseFloat(c.Values[i], 64) + mu.Lock() + if counts[attrValue] == nil { + counts[attrValue] = make(map[int64]float64) + } + counts[attrValue][ts] = v + mu.Unlock() + } + } + } + + cpCopy := *cp + cpCopy.Query = q + qctx := cpCopy.NewQueryContext(ctx) + defer cpCopy.UpdatePerQueryStatsMetrics() + + if err := vtstorage.RunQuery(qctx, writeBlock); err != nil { + return nil, err + } + return counts, nil +} + +// buildCompareSeries builds the Tempo compare response series from per-attribute results. +func buildCompareSeries(results []compareAttrResult, topN int) []tempoMetricsSeries { + if topN <= 0 { + topN = 10 + } + + // Compute divergence per attribute and sort by it descending. + type attrDivergence struct { + idx int + divergence float64 + } + divergences := make([]attrDivergence, 0, len(results)) + for i, ar := range results { + if len(ar.baseline) == 0 && len(ar.selection) == 0 { + continue + } + divergences = append(divergences, attrDivergence{ + idx: i, + divergence: computeDivergence(ar), + }) + } + sort.Slice(divergences, func(i, j int) bool { + return divergences[i].divergence > divergences[j].divergence + }) + + var allSeries []tempoMetricsSeries + + for _, ad := range divergences { + ar := results[ad.idx] + + traceQLName := traceql.VTFieldToTraceQL(ar.attrName) + + // Reverse-map known numeric values to human-readable names. + if ar.attrName == otelpb.StatusCodeField { + ar.baseline = remapStatusValues(ar.baseline) + ar.selection = remapStatusValues(ar.selection) + } + + // Rank values by total count (baseline + selection combined). + type valueTotal struct { + value string + total float64 + } + totals := make(map[string]float64) + for v, tsCounts := range ar.baseline { + for _, c := range tsCounts { + totals[v] += c + } + } + for v, tsCounts := range ar.selection { + for _, c := range tsCounts { + totals[v] += c + } + } + + ranked := make([]valueTotal, 0, len(totals)) + for v, t := range totals { + ranked = append(ranked, valueTotal{v, t}) + } + sort.Slice(ranked, func(i, j int) bool { + return ranked[i].total > ranked[j].total + }) + if len(ranked) > topN { + ranked = ranked[:topN] + } + + // Collect all timestamps across baseline and selection. + tsSet := make(map[int64]bool) + for _, tsCounts := range ar.baseline { + for ts := range tsCounts { + tsSet[ts] = true + } + } + for _, tsCounts := range ar.selection { + for ts := range tsCounts { + tsSet[ts] = true + } + } + timestamps := make([]int64, 0, len(tsSet)) + for ts := range tsSet { + timestamps = append(timestamps, ts) + } + sort.Slice(timestamps, func(i, j int) bool { return timestamps[i] < timestamps[j] }) + + // Compute totals per timestamp (across ALL values, not just topN). + baseTotalByTs := make(map[int64]float64) + selTotalByTs := make(map[int64]float64) + for _, tsCounts := range ar.baseline { + for ts, c := range tsCounts { + baseTotalByTs[ts] += c + } + } + for _, tsCounts := range ar.selection { + for ts, c := range tsCounts { + selTotalByTs[ts] += c + } + } + + // Emit per-value series for topN values. + for _, vt := range ranked { + allSeries = append(allSeries, + makeCompareSeries("baseline", traceQLName, vt.value, ar.baseline[vt.value], timestamps), + makeCompareSeries("selection", traceQLName, vt.value, ar.selection[vt.value], timestamps), + ) + } + + // Emit total series (one per attribute, not per value). + allSeries = append(allSeries, + makeCompareSeriesTotals("baseline_total", traceQLName, baseTotalByTs, timestamps), + makeCompareSeriesTotals("selection_total", traceQLName, selTotalByTs, timestamps), + ) + } + + return allSeries +} + +// computeDivergence computes the total variation distance between the baseline and selection +// distributions for a single attribute. Higher values mean the attribute's distribution changed +// more between baseline and selection — making it more interesting for root-cause analysis. +func computeDivergence(ar compareAttrResult) float64 { + // Sum total counts across all timestamps. + baseTotal := 0.0 + selTotal := 0.0 + baseCounts := make(map[string]float64) + selCounts := make(map[string]float64) + + for v, tsCounts := range ar.baseline { + for _, c := range tsCounts { + baseCounts[v] += c + baseTotal += c + } + } + for v, tsCounts := range ar.selection { + for _, c := range tsCounts { + selCounts[v] += c + selTotal += c + } + } + + if baseTotal == 0 || selTotal == 0 { + return 0 + } + + // Total variation distance: sum of |p_sel(v) - p_base(v)| over all values. + allValues := make(map[string]bool) + for v := range baseCounts { + allValues[v] = true + } + for v := range selCounts { + allValues[v] = true + } + + divergence := 0.0 + for v := range allValues { + baseProp := baseCounts[v] / baseTotal + selProp := selCounts[v] / selTotal + divergence += math.Abs(selProp - baseProp) + } + return divergence +} + +func remapStatusValues(counts map[string]map[int64]float64) map[string]map[int64]float64 { + result := make(map[string]map[int64]float64, len(counts)) + for v, ts := range counts { + result[traceql.StatusCodeToName(v)] = ts + } + return result +} + +func makeCompareSeries(metaType, attrName, attrValue string, tsCounts map[int64]float64, timestamps []int64) tempoMetricsSeries { + samples := make([]tempoSample, len(timestamps)) + for i, ts := range timestamps { + samples[i] = tempoSample{ + TimestampMs: ts / 1e6, + Value: tsCounts[ts], // 0 if missing + } + } + return tempoMetricsSeries{ + Labels: []tempoLabel{ + {Key: "__meta_type", Value: metaType}, + {Key: attrName, Value: attrValue}, + }, + Samples: samples, + } +} + +func makeCompareSeriesTotals(metaType, attrName string, totalByTs map[int64]float64, timestamps []int64) tempoMetricsSeries { + samples := make([]tempoSample, len(timestamps)) + for i, ts := range timestamps { + samples[i] = tempoSample{ + TimestampMs: ts / 1e6, + Value: totalByTs[ts], + } + } + return tempoMetricsSeries{ + Labels: []tempoLabel{ + {Key: "__meta_type", Value: metaType}, + {Key: attrName, Value: ""}, + }, + Samples: samples, + } +} + +// executeStatsQuery runs a single LogsQL stats query and returns Tempo series. +func executeStatsQuery(ctx context.Context, cp *tracecommon.CommonParams, logsQLStr string, byFields []string, params *metricsQueryRangeParam) ([]tempoMetricsSeries, error) { + q, err := logstorage.ParseQueryAtTimestamp(logsQLStr, params.end.UnixNano()) + if err != nil { + return nil, fmt.Errorf("cannot parse query [%s]: %s", logsQLStr, err) + } + q.AddTimeFilter(params.start.UnixNano(), params.end.UnixNano()) + + labelFields, err := q.GetStatsLabelsAddGroupingByTime(params.step, 0) + if err != nil { + return nil, fmt.Errorf("cannot prepare stats query: %s", err) + } + + m := make(map[string]*metricsStatsSeries) + var mLock sync.Mutex + + addPoint := func(key string, labels []logstorage.Field, p metricsStatsPoint) { + mLock.Lock() + ss := m[key] + if ss == nil { + ss = &metricsStatsSeries{ + key: key, + Labels: labels, + } + m[key] = ss + } + ss.Points = append(ss.Points, p) + mLock.Unlock() + } + + writeBlock := func(_ uint, db *logstorage.DataBlock) { + rowsCount := db.RowsCount() + columns := db.GetColumns(false) + + clonedColumnNames := make([]string, len(columns)) + for i, c := range columns { + clonedColumnNames[i] = strings.Clone(c.Name) + } + + for i := range rowsCount { + ts := q.GetTimestamp() + labels := make([]logstorage.Field, 0, len(labelFields)) + + for j, c := range columns { + if c.Name == "_time" { + nsec, ok := logstorage.TryParseTimestampRFC3339Nano(c.Values[i]) + if ok { + ts = nsec + continue + } + } + if slices.Contains(labelFields, c.Name) { + labels = append(labels, logstorage.Field{ + Name: clonedColumnNames[j], + Value: strings.Clone(c.Values[i]), + }) + } + } + + for j, c := range columns { + if slices.Contains(labelFields, c.Name) || c.Name == "_time" { + continue + } + + v := strings.Clone(c.Values[i]) + + // Special case: histogram() returns JSON bucket arrays. + if v == "[]" || strings.HasPrefix(v, `[{"vmrange":"`) { + var buckets []histogramBucket + if err := json.Unmarshal([]byte(v), &buckets); err == nil { + for _, bucket := range buckets { + bucketLabels := make([]logstorage.Field, 0, len(labels)+1) + bucketLabels = append(bucketLabels, filterByFields(labels, byFields)...) + bucketLabels = append(bucketLabels, logstorage.Field{ + Name: "duration", + Value: vmrangeToSeconds(bucket.VMRange), + }) + bp := metricsStatsPoint{ + Timestamp: ts, + Value: strconv.FormatUint(bucket.Hits, 10), + } + bucketKey := fmt.Sprintf("%d:%s:vmrange=%s", j, marshalLabels(labels), bucket.VMRange) + addPoint(bucketKey, bucketLabels, bp) + } + continue + } + } + + p := metricsStatsPoint{ + Timestamp: ts, + Value: v, + } + key := fmt.Sprintf("%d:%s", j, marshalLabels(labels)) + addPoint(key, filterByFields(labels, byFields), p) + } + } + } + + cpCopy := *cp + cpCopy.Query = q + qctx := cpCopy.NewQueryContext(ctx) + defer cpCopy.UpdatePerQueryStatsMetrics() + + if err := vtstorage.RunQuery(qctx, writeBlock); err != nil { + return nil, fmt.Errorf("cannot execute query [%s]: %s", logsQLStr, err) + } + + rows := make([]*metricsStatsSeries, 0, len(m)) + for _, ss := range m { + rows = append(rows, ss) + } + sort.Slice(rows, func(i, j int) bool { + return rows[i].key < rows[j].key + }) + + return transformToTempoSeries(rows), nil +} + +// parseMetricsQueryRangeParams parses query parameters for the metrics/query_range endpoint. +func parseMetricsQueryRangeParams(r *http.Request) (*metricsQueryRangeParam, error) { + qp := r.URL.Query() + + p := &metricsQueryRangeParam{ + end: time.Now(), + } + p.start = p.end.Add(-1 * time.Hour) + + p.q = qp.Get("q") + if p.q == "" { + p.q = qp.Get("query") + } + if p.q == "" { + return nil, fmt.Errorf("missing required parameter: q") + } + + since := qp.Get("since") + if since != "" { + d, err := time.ParseDuration(since) + if err != nil { + return nil, fmt.Errorf("cannot parse 'since': %s", err) + } + p.start = p.end.Add(-d) + } + + startStr := qp.Get("start") + if startStr != "" { + ts, ok := timeutil.TryParseUnixTimestamp(startStr) + if !ok { + return nil, fmt.Errorf("cannot parse 'start': %s", startStr) + } + p.start = time.Unix(ts/1e9, ts%1e9) + } + + endStr := qp.Get("end") + if endStr != "" { + ts, ok := timeutil.TryParseUnixTimestamp(endStr) + if !ok { + return nil, fmt.Errorf("cannot parse 'end': %s", endStr) + } + p.end = time.Unix(ts/1e9, ts%1e9) + } + + if p.start.After(p.end) { + p.start = p.end.Add(-1 * time.Hour) + } + + stepStr := qp.Get("step") + if stepStr != "" { + d, err := time.ParseDuration(stepStr) + if err != nil { + return nil, fmt.Errorf("cannot parse 'step': %s", err) + } + p.step = d.Nanoseconds() + } + + if p.step <= 0 { + rangeNs := p.end.Sub(p.start).Nanoseconds() + p.step = rangeNs / 100 + minStep := int64(time.Second) + if p.step < minStep { + p.step = minStep + } + } + + return p, nil +} + +type histogramBucket struct { + VMRange string `json:"vmrange"` + Hits uint64 `json:"hits"` +} + +// vmrangeToSeconds converts a vmrange (nanosecond boundaries) to its geometric mean in seconds. +// This matches Tempo's convention: Log2Bucketize(durationNanos) / time.Second +// e.g., "5.995e+08...6.813e+08" → "0.639" (seconds) +func vmrangeToSeconds(vmrange string) string { + parts := strings.SplitN(vmrange, "...", 2) + if len(parts) != 2 { + return vmrange + } + lo, errLo := strconv.ParseFloat(parts[0], 64) + hi, errHi := strconv.ParseFloat(parts[1], 64) + if errLo != nil || errHi != nil { + return vmrange + } + mid := math.Sqrt(lo * hi) + seconds := mid / 1e9 + return strconv.FormatFloat(seconds, 'g', -1, 64) +} + +// vmrangeToNanos converts a vmrange to its geometric mean as a nanosecond integer string. +// This matches what Grafana's Drilldown expects for constructing compare filters. +// e.g., "5.995e+08...6.813e+08" → "639000000" +func vmrangeToNanos(vmrange string) string { + parts := strings.SplitN(vmrange, "...", 2) + if len(parts) != 2 { + return vmrange + } + lo, errLo := strconv.ParseFloat(parts[0], 64) + hi, errHi := strconv.ParseFloat(parts[1], 64) + if errLo != nil || errHi != nil { + return vmrange + } + mid := math.Sqrt(lo * hi) + return strconv.FormatInt(int64(mid), 10) +} + +func humanizeVMRange(vmrange string) string { + parts := strings.SplitN(vmrange, "...", 2) + if len(parts) != 2 { + return vmrange + } + lo, errLo := strconv.ParseFloat(parts[0], 64) + hi, errHi := strconv.ParseFloat(parts[1], 64) + if errLo != nil || errHi != nil { + return vmrange + } + mid := math.Sqrt(lo * hi) + return formatDurationNs(mid) +} + +func formatDurationNs(ns float64) string { + switch { + case ns >= 60e9: + return fmt.Sprintf("%.3g mins", ns/60e9) + case ns >= 1e9: + return fmt.Sprintf("%.3g s", ns/1e9) + case ns >= 1e6: + return fmt.Sprintf("%.3g ms", ns/1e6) + case ns >= 1e3: + return fmt.Sprintf("%.3g µs", ns/1e3) + default: + return fmt.Sprintf("%.3g ns", ns) + } +} + +const defaultMaxExemplars = 100 + +// collectExemplars samples trace IDs from spans matching the filter for use as exemplars. +// It runs a lightweight query to get a spread of trace IDs across the time range. +func collectExemplars(ctx context.Context, cp *tracecommon.CommonParams, filterStr string, startNs, endNs, stepNs int64, maxExemplars int) ([]tempoExemplar, error) { + if maxExemplars <= 0 { + maxExemplars = defaultMaxExemplars + } + + // Query: sample spans spread across the time range using time-bucketed sampling. + // Use uniq_values to get one trace_id per time bucket. + bucketCount := maxExemplars + bucketSize := (endNs - startNs) / int64(bucketCount) + if bucketSize < 1e9 { + bucketSize = 1e9 // minimum 1 second buckets + } + bucketSizeStr := strconv.FormatFloat(float64(bucketSize)/1e9, 'f', -1, 64) + "s" + + qStr := fmt.Sprintf("%s | stats by (_time:%s) any(%s) as tid, any(%s) as sid, any(%s) as dur", + filterStr, bucketSizeStr, otelpb.TraceIDField, otelpb.SpanIDField, otelpb.DurationField) + q, err := logstorage.ParseQueryAtTimestamp(qStr, endNs) + if err != nil { + return nil, err + } + q.AddTimeFilter(startNs, endNs) + + type rawExemplar struct { + traceID string + spanID string + duration float64 + tsNs int64 + } + + var exemplars []rawExemplar + var mu sync.Mutex + seen := make(map[string]bool) // dedup by trace_id + + writeBlock := func(_ uint, db *logstorage.DataBlock) { + rowsCount := db.RowsCount() + columns := db.GetColumns(false) + + for i := range rowsCount { + var traceID, spanID string + var duration float64 + tsNs := q.GetTimestamp() + + for _, c := range columns { + switch c.Name { + case "tid": + traceID = strings.Clone(c.Values[i]) + case "sid": + spanID = strings.Clone(c.Values[i]) + case "dur": + duration, _ = strconv.ParseFloat(c.Values[i], 64) + case "_time": + if nsec, ok := logstorage.TryParseTimestampRFC3339Nano(c.Values[i]); ok { + tsNs = nsec + } + } + } + + if traceID == "" { + continue + } + + mu.Lock() + if !seen[traceID] && len(exemplars) < maxExemplars { + seen[traceID] = true + exemplars = append(exemplars, rawExemplar{ + traceID: traceID, + spanID: spanID, + duration: duration, + tsNs: tsNs, + }) + } + mu.Unlock() + } + } + + cpCopy := *cp + cpCopy.Query = q + qctx := cpCopy.NewQueryContext(ctx) + defer cpCopy.UpdatePerQueryStatsMetrics() + + if err := vtstorage.RunQuery(qctx, writeBlock); err != nil { + return nil, err + } + + result := make([]tempoExemplar, len(exemplars)) + for i, e := range exemplars { + result[i] = tempoExemplar{ + TraceID: e.traceID, + SpanID: e.spanID, + TimestampMs: e.tsNs / 1e6, + Value: e.duration / 1e9, // span duration in seconds + } + } + return result, nil +} + +// attachExemplarsToSeries distributes exemplars across series and sets each exemplar's +// value to the corresponding metric sample value so dots appear on the chart line. +func attachExemplarsToSeries(series []tempoMetricsSeries, exemplars []tempoExemplar) { + if len(series) == 0 || len(exemplars) == 0 { + return + } + + // For single series (no by-clause), attach all exemplars. + if len(series) == 1 { + snapExemplarValues(exemplars, series[0].Samples) + series[0].Exemplars = exemplars + return + } + + // For multiple series, distribute exemplars round-robin. + for i := range exemplars { + idx := i % len(series) + series[idx].Exemplars = append(series[idx].Exemplars, exemplars[i]) + } + // Snap values for each series. + for i := range series { + snapExemplarValues(series[i].Exemplars, series[i].Samples) + } +} + +// snapExemplarValues sets each exemplar's value to the nearest sample's value +// so exemplar dots appear on the chart line instead of at the bottom. +func snapExemplarValues(exemplars []tempoExemplar, samples []tempoSample) { + if len(samples) == 0 { + return + } + for i := range exemplars { + bestIdx := 0 + bestDist := abs64(exemplars[i].TimestampMs - samples[0].TimestampMs) + for j := 1; j < len(samples); j++ { + d := abs64(exemplars[i].TimestampMs - samples[j].TimestampMs) + if d < bestDist { + bestDist = d + bestIdx = j + } + } + exemplars[i].Value = samples[bestIdx].Value + } +} + +func abs64(x int64) int64 { + if x < 0 { + return -x + } + return x +} + +func marshalLabels(labels []logstorage.Field) string { + var sb strings.Builder + for i, l := range labels { + if i > 0 { + sb.WriteByte(',') + } + sb.WriteString(l.Name) + sb.WriteByte('=') + sb.WriteString(l.Value) + } + return sb.String() +} + +func filterByFields(labels []logstorage.Field, byFields []string) []logstorage.Field { + if len(byFields) == 0 { + return labels + } + result := make([]logstorage.Field, 0, len(byFields)) + for _, l := range labels { + if slices.Contains(byFields, l.Name) { + result = append(result, l) + } + } + return result +} diff --git a/app/vtselect/traces/tempo/metrics_query.go b/app/vtselect/traces/tempo/metrics_query.go new file mode 100644 index 000000000..f30cc51c6 --- /dev/null +++ b/app/vtselect/traces/tempo/metrics_query.go @@ -0,0 +1,128 @@ +package tempo + +import ( + "fmt" + "strings" + + "github.com/VictoriaMetrics/VictoriaTraces/lib/traceql" +) + +// metricsQueryTranslation holds the result of translating a TraceQL metrics query. +type metricsQueryTranslation struct { + baseQuery string // LogsQL stats query (for non-compare queries) + byFields []string // mapped VT field names + isCompare bool + + // compare-specific fields (only set when isCompare=true) + baseFilter string // LogsQL filter for base (outer filter) + compareFilter string // LogsQL filter for selection (inner filter) + topN int + selectionStartNs int64 + selectionEndNs int64 +} + +// translateMetricsQuery translates a TraceQL metrics query into LogsQL stats query string(s). +// +// For compare() queries, both baseQuery and selectionQuery are populated. +// For all other queries, only baseQuery is populated. +func translateMetricsQuery(traceQLStr string, timestamp int64) (*metricsQueryTranslation, error) { + q, err := traceql.ParseQueryAtTimestamp(traceQLStr, timestamp) + if err != nil { + return nil, fmt.Errorf("cannot parse TraceQL query: %w", err) + } + + funcName, fieldName, quantile, compareParams, traceQLByFields, err := q.MetricsComponents() + if err != nil { + return nil, err + } + + // Get the filter part in LogsQL format. + filterStr := q.Filter() + if filterStr == "*" || filterStr == "" { + filterStr = "*" + } + + // Map the by-fields from TraceQL to VT field names. + vtByFields := make([]string, len(traceQLByFields)) + for i, f := range traceQLByFields { + vtByFields[i] = traceql.TraceQLFieldToVTField(f) + } + + result := &metricsQueryTranslation{ + byFields: vtByFields, + baseFilter: filterStr, // used by exemplar collection and compare queries + } + + if funcName == "compare" && compareParams != nil { + result.isCompare = true + result.baseFilter = filterStr + result.compareFilter = compareParams.Filter + result.topN = compareParams.TopN + result.selectionStartNs = compareParams.StartNs + result.selectionEndNs = compareParams.EndNs + return result, nil + } + + // Standard metrics query. + statsExpr, err := metricsToLogsQLStats(funcName, fieldName, quantile) + if err != nil { + return nil, err + } + result.baseQuery = buildStatsQuery(filterStr, vtByFields, statsExpr) + return result, nil +} + +// buildStatsQuery assembles a LogsQL stats query from filter, by-fields, and stats expression. +func buildStatsQuery(filterStr string, vtByFields []string, statsExpr string) string { + var sb strings.Builder + sb.WriteString(filterStr) + sb.WriteString(" | stats") + if len(vtByFields) > 0 { + sb.WriteString(" by (") + for i, f := range vtByFields { + if i > 0 { + sb.WriteString(", ") + } + sb.WriteString(quoteLogsQLField(f)) + } + sb.WriteString(")") + } + sb.WriteString(" ") + sb.WriteString(statsExpr) + return sb.String() +} + +// metricsToLogsQLStats converts a TraceQL metrics function to its LogsQL stats expression. +func metricsToLogsQLStats(funcName, fieldName, quantile string) (string, error) { + vtField := quoteLogsQLField(traceql.TraceQLFieldToVTField(fieldName)) + switch funcName { + case "rate": + return "rate() as value", nil + case "count_over_time": + return "count() as value", nil + case "min_over_time": + return "min(" + vtField + ") as value", nil + case "max_over_time": + return "max(" + vtField + ") as value", nil + case "avg_over_time": + return "avg(" + vtField + ") as value", nil + case "sum_over_time": + return "sum(" + vtField + ") as value", nil + case "histogram_over_time": + return "histogram(" + vtField + ") as value", nil + case "quantile_over_time": + return "quantile(" + quantile + ", " + vtField + ") as value", nil + default: + return "", fmt.Errorf("unsupported metrics function: %s", funcName) + } +} + +// quoteLogsQLField quotes a field name for LogsQL if it contains special characters. +func quoteLogsQLField(s string) string { + for _, c := range s { + if c == ':' || c == ' ' || c == '"' || c == '(' || c == ')' || c == '|' || c == ',' { + return `"` + strings.ReplaceAll(s, `"`, `\"`) + `"` + } + } + return s +} diff --git a/app/vtselect/traces/tempo/metrics_query_test.go b/app/vtselect/traces/tempo/metrics_query_test.go new file mode 100644 index 000000000..18e7ebb59 --- /dev/null +++ b/app/vtselect/traces/tempo/metrics_query_test.go @@ -0,0 +1,288 @@ +package tempo + +import ( + "net/http" + "strings" + "testing" + "time" + + "github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage" +) + +func TestTranslateMetricsQueryFull(t *testing.T) { + ts := time.Now().UnixNano() + + f := func(traceQL, expectedLogsQL string) { + t.Helper() + tr, err := translateMetricsQuery(traceQL, ts) + if err != nil { + t.Fatalf("translateMetricsQuery(%q): %s", traceQL, err) + } + if tr.baseQuery != expectedLogsQL { + t.Fatalf("translateMetricsQuery(%q):\n got: %q\n want: %q", traceQL, tr.baseQuery, expectedLogsQL) + } + // Verify the generated LogsQL actually parses. + _, err = logstorage.ParseQueryAtTimestamp(tr.baseQuery, ts) + if err != nil { + t.Fatalf("generated LogsQL does not parse: %q: %s", tr.baseQuery, err) + } + } + + // Basic rate + f(`{} | rate()`, `* | stats rate() as value`) + + // Status value mapping + f(`{status = error} | rate()`, `status_code:=2 | stats rate() as value`) + + // Duration filter (100ms = 100000000ns) + f(`{duration > 100ms} | rate()`, `duration:>100000000 | stats rate() as value`) + + // Resource attribute filter + f(`{resource.service.name = "api"} | rate()`, + `"resource_attr:service.name":=api | stats rate() as value`) + + // Span attribute filter + f(`{span.http.status_code >= 400} | count_over_time()`, + `"span_attr:http.status_code":>=400 | stats count() as value`) + + // Name filter + f(`{name = "http_request"} | rate()`, `name:=http_request | stats rate() as value`) + + // nestedSetParent → root spans + f(`{nestedSetParent < 0} | rate()`, `parent_span_id:="" | stats rate() as value`) + + // Grafana sends {true && true} + f(`{true && true} | rate()`, `* and * | stats rate() as value`) + + // All aggregation functions + f(`{} | count_over_time()`, `* | stats count() as value`) + f(`{} | min_over_time(duration)`, `* | stats min(duration) as value`) + f(`{} | max_over_time(duration)`, `* | stats max(duration) as value`) + f(`{} | avg_over_time(duration)`, `* | stats avg(duration) as value`) + f(`{} | sum_over_time(duration)`, `* | stats sum(duration) as value`) + f(`{} | histogram_over_time(duration)`, `* | stats histogram(duration) as value`) + f(`{} | quantile_over_time(duration, 0.9)`, `* | stats quantile(0.9, duration) as value`) + + // Field name mapping in aggregation + f(`{} | sum_over_time(span.kafka.lag)`, `* | stats sum("span_attr:kafka.lag") as value`) + f(`{} | avg_over_time(span.http.response_content_length)`, + `* | stats avg("span_attr:http.response_content_length") as value`) + f(`{} | max_over_time(span.http.status_code)`, + `* | stats max("span_attr:http.status_code") as value`) +} + +func TestTranslateMetricsQueryByFields(t *testing.T) { + ts := time.Now().UnixNano() + + f := func(traceQL, expectedLogsQL string) { + t.Helper() + tr, err := translateMetricsQuery(traceQL, ts) + if err != nil { + t.Fatalf("translateMetricsQuery(%q): %s", traceQL, err) + } + if tr.baseQuery != expectedLogsQL { + t.Fatalf("translateMetricsQuery(%q):\n got: %q\n want: %q", traceQL, tr.baseQuery, expectedLogsQL) + } + _, err = logstorage.ParseQueryAtTimestamp(tr.baseQuery, ts) + if err != nil { + t.Fatalf("generated LogsQL does not parse: %q: %s", tr.baseQuery, err) + } + } + + // Tempo-style by() without | separator + f(`{} | rate() by(resource.service.name)`, + `* | stats by ("resource_attr:service.name") rate() as value`) + + // With explicit | separator + f(`{} | rate() | by(resource.service.name)`, + `* | stats by ("resource_attr:service.name") rate() as value`) + + // Intrinsic field + f(`{} | rate() | by(name)`, + `* | stats by (name) rate() as value`) + + // Status field mapping in by() + f(`{} | rate() | by(status)`, + `* | stats by (status_code) rate() as value`) + + // Multiple by fields + f(`{} | avg_over_time(duration) | by(resource.service.name, span.http.method)`, + `* | stats by ("resource_attr:service.name", "span_attr:http.method") avg(duration) as value`) + + // Quantile with by + f(`{} | quantile_over_time(duration, 0.5) | by(resource.service.name)`, + `* | stats by ("resource_attr:service.name") quantile(0.5, duration) as value`) + + // Complex: filter + aggregation + by + f(`{resource.service.name = "api"} | max_over_time(span.http.status_code) | by(resource.service.name)`, + `"resource_attr:service.name":=api | stats by ("resource_attr:service.name") max("span_attr:http.status_code") as value`) +} + +func TestTranslateMetricsQueryWithHints(t *testing.T) { + ts := time.Now().UnixNano() + + // with() hints should be silently ignored — output must match the hint-free version. + baseRate, err := translateMetricsQuery(`{} | rate()`, ts) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + withSample, err := translateMetricsQuery(`{} | rate() with(sample=true)`, ts) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if withSample.baseQuery != baseRate.baseQuery { + t.Fatalf("with(sample=true) changed output:\n got: %q\n want: %q", withSample.baseQuery, baseRate.baseQuery) + } + + withExemplars, err := translateMetricsQuery(`{} | rate() with(exemplars=0)`, ts) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if withExemplars.baseQuery != baseRate.baseQuery { + t.Fatalf("with(exemplars=0) changed output:\n got: %q\n want: %q", withExemplars.baseQuery, baseRate.baseQuery) + } + + // with() + by() combined + baseBy, err := translateMetricsQuery(`{} | rate() | by(resource.service.name)`, ts) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + withBy, err := translateMetricsQuery(`{} | rate() by(resource.service.name) with(sample=0.5)`, ts) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if withBy.baseQuery != baseBy.baseQuery { + t.Fatalf("with(sample=0.5) + by() changed output:\n got: %q\n want: %q", withBy.baseQuery, baseBy.baseQuery) + } +} + +func TestTranslateMetricsQueryCompare(t *testing.T) { + ts := time.Now().UnixNano() + + // Basic compare with status filter + tr, err := translateMetricsQuery(`{} | compare({status = error}, 10)`, ts) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if !tr.isCompare { + t.Fatal("expected isCompare=true") + } + if tr.baseFilter != "*" { + t.Fatalf("expected baseFilter=*; got %q", tr.baseFilter) + } + if !strings.Contains(tr.compareFilter, "status_code") { + t.Fatalf("expected status_code in compareFilter; got %q", tr.compareFilter) + } + if tr.topN != 10 { + t.Fatalf("expected topN=10; got %d", tr.topN) + } + + // Compare with non-trivial base filter + tr, err = translateMetricsQuery(`{resource.service.name = "api"} | compare({duration >= 100ms}, 5)`, ts) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if !strings.Contains(tr.baseFilter, "resource_attr:service.name") { + t.Fatalf("expected resource_attr in baseFilter; got %q", tr.baseFilter) + } + if !strings.Contains(tr.compareFilter, "duration") { + t.Fatalf("expected duration in compareFilter; got %q", tr.compareFilter) + } + if tr.topN != 5 { + t.Fatalf("expected topN=5; got %d", tr.topN) + } + + // Full 4-arg form with timestamps + tr, err = translateMetricsQuery(`{true && true} | compare({duration >= 6s && duration <= 230s}, 10, 1775053673000000000, 1775054024000000000)`, ts) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if !tr.isCompare { + t.Fatal("expected isCompare=true") + } + if tr.topN != 10 { + t.Fatalf("expected topN=10; got %d", tr.topN) + } + if tr.selectionStartNs != 1775053673000000000 { + t.Fatalf("unexpected selectionStartNs; got %d", tr.selectionStartNs) + } + if tr.selectionEndNs != 1775054024000000000 { + t.Fatalf("unexpected selectionEndNs; got %d", tr.selectionEndNs) + } +} + +func TestTranslateMetricsQueryWithComparison(t *testing.T) { + ts := time.Now().UnixNano() + + // rate() > 5 should parse without error + tr, err := translateMetricsQuery(`{} | rate() > 5`, ts) + if err != nil { + t.Fatalf("unexpected error for rate() > 5: %s", err) + } + if tr.baseQuery == "" { + t.Fatal("expected non-empty baseQuery") + } + + // count_over_time() >= 100 + tr, err = translateMetricsQuery(`{} | count_over_time() >= 100`, ts) + if err != nil { + t.Fatalf("unexpected error for count_over_time() >= 100: %s", err) + } + if tr.baseQuery == "" { + t.Fatal("expected non-empty baseQuery") + } +} + +func TestTranslateMetricsQueryErrors(t *testing.T) { + ts := time.Now().UnixNano() + + // Non-metrics query should fail. + _, err := translateMetricsQuery(`{resource.service.name = "frontend"}`, ts) + if err == nil { + t.Fatal("expected error for non-metrics query") + } + + // Invalid syntax should fail. + _, err = translateMetricsQuery(`{invalid`, ts) + if err == nil { + t.Fatal("expected error for invalid query") + } +} + +func TestParseMetricsQueryRangeParams(t *testing.T) { + // Test with minimal params. + r := newTestRequest("q=%7B%7D+%7C+rate()&step=60s") + p, err := parseMetricsQueryRangeParams(r) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if p.q != "{} | rate()" { + t.Fatalf("unexpected q; got %q", p.q) + } + if p.step != int64(60*time.Second) { + t.Fatalf("unexpected step; got %d; want %d", p.step, int64(60*time.Second)) + } + + // Test missing q. + r = newTestRequest("step=60s") + _, err = parseMetricsQueryRangeParams(r) + if err == nil { + t.Fatal("expected error for missing q parameter") + } + + // Test auto step calculation. + r = newTestRequest("q=%7B%7D+%7C+rate()&start=1700000000&end=1700003600") + p, err = parseMetricsQueryRangeParams(r) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if p.step <= 0 { + t.Fatalf("expected positive step; got %d", p.step) + } +} + +func newTestRequest(query string) *http.Request { + r, _ := http.NewRequest("GET", "/api/metrics/query_range?"+query, nil) + return r +} diff --git a/app/vtselect/traces/tempo/metrics_response.qtpl b/app/vtselect/traces/tempo/metrics_response.qtpl new file mode 100644 index 000000000..439fe72e2 --- /dev/null +++ b/app/vtselect/traces/tempo/metrics_response.qtpl @@ -0,0 +1,83 @@ +{% import ( + "strconv" +) %} + +{% stripspace %} + +{% func MetricsQueryRangeResponse(series []tempoMetricsSeries) %} +{ + "series":[ + {% if len(series) > 0 %} + {%= metricsSeriesJson(&series[0]) %} + {% for i := 1; i < len(series); i++ %} + ,{%= metricsSeriesJson(&series[i]) %} + {% endfor %} + {% endif %} + ], + "metrics":{ + "inspectedBytes":"0", + "inspectedTraces":0, + "totalJobs":0, + "completedJobs":0 + }, + "status":"COMPLETE" +} +{% endfunc %} + +{% func metricsSeriesJson(s *tempoMetricsSeries) %} +{ + "labels":[ + {% if len(s.Labels) > 0 %} + {%= metricsLabelJson(&s.Labels[0]) %} + {% for i := 1; i < len(s.Labels); i++ %} + ,{%= metricsLabelJson(&s.Labels[i]) %} + {% endfor %} + {% endif %} + ], + "samples":[ + {% if len(s.Samples) > 0 %} + {%= metricsSampleJson(&s.Samples[0]) %} + {% for i := 1; i < len(s.Samples); i++ %} + ,{%= metricsSampleJson(&s.Samples[i]) %} + {% endfor %} + {% endif %} + ], + "exemplars":[ + {% if len(s.Exemplars) > 0 %} + {%= metricsExemplarJson(&s.Exemplars[0]) %} + {% for i := 1; i < len(s.Exemplars); i++ %} + ,{%= metricsExemplarJson(&s.Exemplars[i]) %} + {% endfor %} + {% endif %} + ] +} +{% endfunc %} + +{% func metricsExemplarJson(e *tempoExemplar) %} +{ + "labels":[ + {"key":"trace:id","value":{"stringValue":{%q= e.TraceID %}}}, + {"key":"span:id","value":{"stringValue":{%q= e.SpanID %}}} + ], + "value":{%f= e.Value %}, + "timestampMs":"{%s= strconv.FormatInt(e.TimestampMs, 10) %}" +} +{% endfunc %} + +{% func metricsLabelJson(l *tempoLabel) %} +{ + "key":{%q= l.Key %}, + "value":{ + "stringValue":{%q= l.Value %} + } +} +{% endfunc %} + +{% func metricsSampleJson(s *tempoSample) %} +{ + "timestampMs":"{%s= strconv.FormatInt(s.TimestampMs, 10) %}", + "value":{%f= s.Value %} +} +{% endfunc %} + +{% endstripspace %} diff --git a/app/vtselect/traces/tempo/metrics_response.qtpl.go b/app/vtselect/traces/tempo/metrics_response.qtpl.go new file mode 100644 index 000000000..8949d2718 --- /dev/null +++ b/app/vtselect/traces/tempo/metrics_response.qtpl.go @@ -0,0 +1,284 @@ +// Code generated by qtc from "metrics_response.qtpl". DO NOT EDIT. +// See https://github.com/valyala/quicktemplate for details. + +//line app/vtselect/traces/tempo/metrics_response.qtpl:1 +package tempo + +//line app/vtselect/traces/tempo/metrics_response.qtpl:1 +import ( + "strconv" +) + +//line app/vtselect/traces/tempo/metrics_response.qtpl:7 +import ( + qtio422016 "io" + + qt422016 "github.com/valyala/quicktemplate" +) + +//line app/vtselect/traces/tempo/metrics_response.qtpl:7 +var ( + _ = qtio422016.Copy + _ = qt422016.AcquireByteBuffer +) + +//line app/vtselect/traces/tempo/metrics_response.qtpl:7 +func StreamMetricsQueryRangeResponse(qw422016 *qt422016.Writer, series []tempoMetricsSeries) { +//line app/vtselect/traces/tempo/metrics_response.qtpl:7 + qw422016.N().S(`{"series":[`) +//line app/vtselect/traces/tempo/metrics_response.qtpl:10 + if len(series) > 0 { +//line app/vtselect/traces/tempo/metrics_response.qtpl:11 + streammetricsSeriesJson(qw422016, &series[0]) +//line app/vtselect/traces/tempo/metrics_response.qtpl:12 + for i := 1; i < len(series); i++ { +//line app/vtselect/traces/tempo/metrics_response.qtpl:12 + qw422016.N().S(`,`) +//line app/vtselect/traces/tempo/metrics_response.qtpl:13 + streammetricsSeriesJson(qw422016, &series[i]) +//line app/vtselect/traces/tempo/metrics_response.qtpl:14 + } +//line app/vtselect/traces/tempo/metrics_response.qtpl:15 + } +//line app/vtselect/traces/tempo/metrics_response.qtpl:15 + qw422016.N().S(`],"metrics":{"inspectedBytes":"0","inspectedTraces":0,"totalJobs":0,"completedJobs":0},"status":"COMPLETE"}`) +//line app/vtselect/traces/tempo/metrics_response.qtpl:25 +} + +//line app/vtselect/traces/tempo/metrics_response.qtpl:25 +func WriteMetricsQueryRangeResponse(qq422016 qtio422016.Writer, series []tempoMetricsSeries) { +//line app/vtselect/traces/tempo/metrics_response.qtpl:25 + qw422016 := qt422016.AcquireWriter(qq422016) +//line app/vtselect/traces/tempo/metrics_response.qtpl:25 + StreamMetricsQueryRangeResponse(qw422016, series) +//line app/vtselect/traces/tempo/metrics_response.qtpl:25 + qt422016.ReleaseWriter(qw422016) +//line app/vtselect/traces/tempo/metrics_response.qtpl:25 +} + +//line app/vtselect/traces/tempo/metrics_response.qtpl:25 +func MetricsQueryRangeResponse(series []tempoMetricsSeries) string { +//line app/vtselect/traces/tempo/metrics_response.qtpl:25 + qb422016 := qt422016.AcquireByteBuffer() +//line app/vtselect/traces/tempo/metrics_response.qtpl:25 + WriteMetricsQueryRangeResponse(qb422016, series) +//line app/vtselect/traces/tempo/metrics_response.qtpl:25 + qs422016 := string(qb422016.B) +//line app/vtselect/traces/tempo/metrics_response.qtpl:25 + qt422016.ReleaseByteBuffer(qb422016) +//line app/vtselect/traces/tempo/metrics_response.qtpl:25 + return qs422016 +//line app/vtselect/traces/tempo/metrics_response.qtpl:25 +} + +//line app/vtselect/traces/tempo/metrics_response.qtpl:27 +func streammetricsSeriesJson(qw422016 *qt422016.Writer, s *tempoMetricsSeries) { +//line app/vtselect/traces/tempo/metrics_response.qtpl:27 + qw422016.N().S(`{"labels":[`) +//line app/vtselect/traces/tempo/metrics_response.qtpl:30 + if len(s.Labels) > 0 { +//line app/vtselect/traces/tempo/metrics_response.qtpl:31 + streammetricsLabelJson(qw422016, &s.Labels[0]) +//line app/vtselect/traces/tempo/metrics_response.qtpl:32 + for i := 1; i < len(s.Labels); i++ { +//line app/vtselect/traces/tempo/metrics_response.qtpl:32 + qw422016.N().S(`,`) +//line app/vtselect/traces/tempo/metrics_response.qtpl:33 + streammetricsLabelJson(qw422016, &s.Labels[i]) +//line app/vtselect/traces/tempo/metrics_response.qtpl:34 + } +//line app/vtselect/traces/tempo/metrics_response.qtpl:35 + } +//line app/vtselect/traces/tempo/metrics_response.qtpl:35 + qw422016.N().S(`],"samples":[`) +//line app/vtselect/traces/tempo/metrics_response.qtpl:38 + if len(s.Samples) > 0 { +//line app/vtselect/traces/tempo/metrics_response.qtpl:39 + streammetricsSampleJson(qw422016, &s.Samples[0]) +//line app/vtselect/traces/tempo/metrics_response.qtpl:40 + for i := 1; i < len(s.Samples); i++ { +//line app/vtselect/traces/tempo/metrics_response.qtpl:40 + qw422016.N().S(`,`) +//line app/vtselect/traces/tempo/metrics_response.qtpl:41 + streammetricsSampleJson(qw422016, &s.Samples[i]) +//line app/vtselect/traces/tempo/metrics_response.qtpl:42 + } +//line app/vtselect/traces/tempo/metrics_response.qtpl:43 + } +//line app/vtselect/traces/tempo/metrics_response.qtpl:43 + qw422016.N().S(`],"exemplars":[`) +//line app/vtselect/traces/tempo/metrics_response.qtpl:46 + if len(s.Exemplars) > 0 { +//line app/vtselect/traces/tempo/metrics_response.qtpl:47 + streammetricsExemplarJson(qw422016, &s.Exemplars[0]) +//line app/vtselect/traces/tempo/metrics_response.qtpl:48 + for i := 1; i < len(s.Exemplars); i++ { +//line app/vtselect/traces/tempo/metrics_response.qtpl:48 + qw422016.N().S(`,`) +//line app/vtselect/traces/tempo/metrics_response.qtpl:49 + streammetricsExemplarJson(qw422016, &s.Exemplars[i]) +//line app/vtselect/traces/tempo/metrics_response.qtpl:50 + } +//line app/vtselect/traces/tempo/metrics_response.qtpl:51 + } +//line app/vtselect/traces/tempo/metrics_response.qtpl:51 + qw422016.N().S(`]}`) +//line app/vtselect/traces/tempo/metrics_response.qtpl:54 +} + +//line app/vtselect/traces/tempo/metrics_response.qtpl:54 +func writemetricsSeriesJson(qq422016 qtio422016.Writer, s *tempoMetricsSeries) { +//line app/vtselect/traces/tempo/metrics_response.qtpl:54 + qw422016 := qt422016.AcquireWriter(qq422016) +//line app/vtselect/traces/tempo/metrics_response.qtpl:54 + streammetricsSeriesJson(qw422016, s) +//line app/vtselect/traces/tempo/metrics_response.qtpl:54 + qt422016.ReleaseWriter(qw422016) +//line app/vtselect/traces/tempo/metrics_response.qtpl:54 +} + +//line app/vtselect/traces/tempo/metrics_response.qtpl:54 +func metricsSeriesJson(s *tempoMetricsSeries) string { +//line app/vtselect/traces/tempo/metrics_response.qtpl:54 + qb422016 := qt422016.AcquireByteBuffer() +//line app/vtselect/traces/tempo/metrics_response.qtpl:54 + writemetricsSeriesJson(qb422016, s) +//line app/vtselect/traces/tempo/metrics_response.qtpl:54 + qs422016 := string(qb422016.B) +//line app/vtselect/traces/tempo/metrics_response.qtpl:54 + qt422016.ReleaseByteBuffer(qb422016) +//line app/vtselect/traces/tempo/metrics_response.qtpl:54 + return qs422016 +//line app/vtselect/traces/tempo/metrics_response.qtpl:54 +} + +//line app/vtselect/traces/tempo/metrics_response.qtpl:56 +func streammetricsExemplarJson(qw422016 *qt422016.Writer, e *tempoExemplar) { +//line app/vtselect/traces/tempo/metrics_response.qtpl:56 + qw422016.N().S(`{"labels":[{"key":"trace:id","value":{"stringValue":`) +//line app/vtselect/traces/tempo/metrics_response.qtpl:59 + qw422016.N().Q(e.TraceID) +//line app/vtselect/traces/tempo/metrics_response.qtpl:59 + qw422016.N().S(`}},{"key":"span:id","value":{"stringValue":`) +//line app/vtselect/traces/tempo/metrics_response.qtpl:60 + qw422016.N().Q(e.SpanID) +//line app/vtselect/traces/tempo/metrics_response.qtpl:60 + qw422016.N().S(`}}],"value":`) +//line app/vtselect/traces/tempo/metrics_response.qtpl:62 + qw422016.N().F(e.Value) +//line app/vtselect/traces/tempo/metrics_response.qtpl:62 + qw422016.N().S(`,"timestampMs":"`) +//line app/vtselect/traces/tempo/metrics_response.qtpl:63 + qw422016.N().S(strconv.FormatInt(e.TimestampMs, 10)) +//line app/vtselect/traces/tempo/metrics_response.qtpl:63 + qw422016.N().S(`"}`) +//line app/vtselect/traces/tempo/metrics_response.qtpl:65 +} + +//line app/vtselect/traces/tempo/metrics_response.qtpl:65 +func writemetricsExemplarJson(qq422016 qtio422016.Writer, e *tempoExemplar) { +//line app/vtselect/traces/tempo/metrics_response.qtpl:65 + qw422016 := qt422016.AcquireWriter(qq422016) +//line app/vtselect/traces/tempo/metrics_response.qtpl:65 + streammetricsExemplarJson(qw422016, e) +//line app/vtselect/traces/tempo/metrics_response.qtpl:65 + qt422016.ReleaseWriter(qw422016) +//line app/vtselect/traces/tempo/metrics_response.qtpl:65 +} + +//line app/vtselect/traces/tempo/metrics_response.qtpl:65 +func metricsExemplarJson(e *tempoExemplar) string { +//line app/vtselect/traces/tempo/metrics_response.qtpl:65 + qb422016 := qt422016.AcquireByteBuffer() +//line app/vtselect/traces/tempo/metrics_response.qtpl:65 + writemetricsExemplarJson(qb422016, e) +//line app/vtselect/traces/tempo/metrics_response.qtpl:65 + qs422016 := string(qb422016.B) +//line app/vtselect/traces/tempo/metrics_response.qtpl:65 + qt422016.ReleaseByteBuffer(qb422016) +//line app/vtselect/traces/tempo/metrics_response.qtpl:65 + return qs422016 +//line app/vtselect/traces/tempo/metrics_response.qtpl:65 +} + +//line app/vtselect/traces/tempo/metrics_response.qtpl:67 +func streammetricsLabelJson(qw422016 *qt422016.Writer, l *tempoLabel) { +//line app/vtselect/traces/tempo/metrics_response.qtpl:67 + qw422016.N().S(`{"key":`) +//line app/vtselect/traces/tempo/metrics_response.qtpl:69 + qw422016.N().Q(l.Key) +//line app/vtselect/traces/tempo/metrics_response.qtpl:69 + qw422016.N().S(`,"value":{"stringValue":`) +//line app/vtselect/traces/tempo/metrics_response.qtpl:71 + qw422016.N().Q(l.Value) +//line app/vtselect/traces/tempo/metrics_response.qtpl:71 + qw422016.N().S(`}}`) +//line app/vtselect/traces/tempo/metrics_response.qtpl:74 +} + +//line app/vtselect/traces/tempo/metrics_response.qtpl:74 +func writemetricsLabelJson(qq422016 qtio422016.Writer, l *tempoLabel) { +//line app/vtselect/traces/tempo/metrics_response.qtpl:74 + qw422016 := qt422016.AcquireWriter(qq422016) +//line app/vtselect/traces/tempo/metrics_response.qtpl:74 + streammetricsLabelJson(qw422016, l) +//line app/vtselect/traces/tempo/metrics_response.qtpl:74 + qt422016.ReleaseWriter(qw422016) +//line app/vtselect/traces/tempo/metrics_response.qtpl:74 +} + +//line app/vtselect/traces/tempo/metrics_response.qtpl:74 +func metricsLabelJson(l *tempoLabel) string { +//line app/vtselect/traces/tempo/metrics_response.qtpl:74 + qb422016 := qt422016.AcquireByteBuffer() +//line app/vtselect/traces/tempo/metrics_response.qtpl:74 + writemetricsLabelJson(qb422016, l) +//line app/vtselect/traces/tempo/metrics_response.qtpl:74 + qs422016 := string(qb422016.B) +//line app/vtselect/traces/tempo/metrics_response.qtpl:74 + qt422016.ReleaseByteBuffer(qb422016) +//line app/vtselect/traces/tempo/metrics_response.qtpl:74 + return qs422016 +//line app/vtselect/traces/tempo/metrics_response.qtpl:74 +} + +//line app/vtselect/traces/tempo/metrics_response.qtpl:76 +func streammetricsSampleJson(qw422016 *qt422016.Writer, s *tempoSample) { +//line app/vtselect/traces/tempo/metrics_response.qtpl:76 + qw422016.N().S(`{"timestampMs":"`) +//line app/vtselect/traces/tempo/metrics_response.qtpl:78 + qw422016.N().S(strconv.FormatInt(s.TimestampMs, 10)) +//line app/vtselect/traces/tempo/metrics_response.qtpl:78 + qw422016.N().S(`","value":`) +//line app/vtselect/traces/tempo/metrics_response.qtpl:79 + qw422016.N().F(s.Value) +//line app/vtselect/traces/tempo/metrics_response.qtpl:79 + qw422016.N().S(`}`) +//line app/vtselect/traces/tempo/metrics_response.qtpl:81 +} + +//line app/vtselect/traces/tempo/metrics_response.qtpl:81 +func writemetricsSampleJson(qq422016 qtio422016.Writer, s *tempoSample) { +//line app/vtselect/traces/tempo/metrics_response.qtpl:81 + qw422016 := qt422016.AcquireWriter(qq422016) +//line app/vtselect/traces/tempo/metrics_response.qtpl:81 + streammetricsSampleJson(qw422016, s) +//line app/vtselect/traces/tempo/metrics_response.qtpl:81 + qt422016.ReleaseWriter(qw422016) +//line app/vtselect/traces/tempo/metrics_response.qtpl:81 +} + +//line app/vtselect/traces/tempo/metrics_response.qtpl:81 +func metricsSampleJson(s *tempoSample) string { +//line app/vtselect/traces/tempo/metrics_response.qtpl:81 + qb422016 := qt422016.AcquireByteBuffer() +//line app/vtselect/traces/tempo/metrics_response.qtpl:81 + writemetricsSampleJson(qb422016, s) +//line app/vtselect/traces/tempo/metrics_response.qtpl:81 + qs422016 := string(qb422016.B) +//line app/vtselect/traces/tempo/metrics_response.qtpl:81 + qt422016.ReleaseByteBuffer(qb422016) +//line app/vtselect/traces/tempo/metrics_response.qtpl:81 + return qs422016 +//line app/vtselect/traces/tempo/metrics_response.qtpl:81 +} diff --git a/app/vtselect/traces/tempo/metrics_transform.go b/app/vtselect/traces/tempo/metrics_transform.go new file mode 100644 index 000000000..4ef867034 --- /dev/null +++ b/app/vtselect/traces/tempo/metrics_transform.go @@ -0,0 +1,85 @@ +package tempo + +import ( + "sort" + "strconv" + + "github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage" + "github.com/VictoriaMetrics/VictoriaTraces/lib/traceql" +) + +// tempoMetricsSeries represents a single time series in the Tempo QueryRangeResponse format. +type tempoMetricsSeries struct { + Labels []tempoLabel + Samples []tempoSample + Exemplars []tempoExemplar +} + +// tempoExemplar represents an exemplar linking a metric data point to a specific trace. +type tempoExemplar struct { + TraceID string + SpanID string + TimestampMs int64 + Value float64 // span duration in seconds, or NaN +} + +// tempoLabel represents a label in the Tempo OTEL KeyValue format. +type tempoLabel struct { + Key string + Value string +} + +// tempoSample represents a single data point in a time series. +type tempoSample struct { + TimestampMs int64 + Value float64 +} + +// metricsStatsSeries mirrors the statsSeries from logsql but is local to avoid cross-package dependency. +type metricsStatsSeries struct { + key string + Labels []logstorage.Field + Points []metricsStatsPoint +} + +// metricsStatsPoint represents a single data point collected from VictoriaLogs stats query. +type metricsStatsPoint struct { + Timestamp int64 // nanoseconds + Value string // string representation of the value +} + +// transformToTempoSeries converts collected stats results to Tempo series format. +func transformToTempoSeries(rows []*metricsStatsSeries) []tempoMetricsSeries { + result := make([]tempoMetricsSeries, 0, len(rows)) + for _, ss := range rows { + ts := tempoMetricsSeries{ + Labels: make([]tempoLabel, 0, len(ss.Labels)), + } + + // Convert label field names back from VT internal names to TraceQL names. + for _, label := range ss.Labels { + ts.Labels = append(ts.Labels, tempoLabel{ + Key: traceql.VTFieldToTraceQL(label.Name), + Value: label.Value, + }) + } + + // Convert data points. + ts.Samples = make([]tempoSample, 0, len(ss.Points)) + for _, p := range ss.Points { + value, _ := strconv.ParseFloat(p.Value, 64) + ts.Samples = append(ts.Samples, tempoSample{ + TimestampMs: p.Timestamp / 1e6, // nanoseconds -> milliseconds + Value: value, + }) + } + + // Sort samples by timestamp. + sort.Slice(ts.Samples, func(i, j int) bool { + return ts.Samples[i].TimestampMs < ts.Samples[j].TimestampMs + }) + + result = append(result, ts) + } + return result +} diff --git a/app/vtselect/traces/tempo/tempo.go b/app/vtselect/traces/tempo/tempo.go index 5f0a517c9..a8cef77ea 100644 --- a/app/vtselect/traces/tempo/tempo.go +++ b/app/vtselect/traces/tempo/tempo.go @@ -36,6 +36,9 @@ var ( tempoQueryV2Requests = metrics.NewCounter(`vt_http_requests_total{path="/select/tempo/api/v2/traces/*"}`) tempoQueryV2Duration = metrics.NewSummary(`vt_http_request_duration_seconds{path="/select/tempo/api/v2/traces/*"}`) + + tempoMetricsQueryRangeRequests = metrics.NewCounter(`vt_http_requests_total{path="/select/tempo/api/metrics/query_range"}`) + tempoMetricsQueryRangeDuration = metrics.NewSummary(`vt_http_request_duration_seconds{path="/select/tempo/api/metrics/query_range"}`) ) var ( @@ -65,6 +68,11 @@ func RequestHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) processSearchRequest(ctx, w, r) tempoSearchDuration.UpdateDuration(startTime) return true + } else if path == "/select/tempo/api/metrics/query_range" { + tempoMetricsQueryRangeRequests.Inc() + processMetricsQueryRangeRequest(ctx, w, r) + tempoMetricsQueryRangeDuration.UpdateDuration(startTime) + return true } else if strings.HasPrefix(path, "/select/tempo/api/v2/traces/") && len(path) > len("/select/tempo/api/v2/traces/") { tempoQueryV2Requests.Inc() processQueryV2Request(ctx, w, r) @@ -251,10 +259,11 @@ func searchTags(ctx context.Context, cp *tracecommon.CommonParams, traceQLStr st filterQuery = defaultNoopFilter } - // exclude queries that contains pipe(s). - if filterQuery.HasPipe() { + // exclude queries that contain non-hint pipes (select/by/with are OK to ignore). + if filterQuery.HasNonHintPipe() { return nil, fmt.Errorf("cannot use query pipes in search tag values API: %s", traceQLStr) } + filterQuery.StripHintPipes() scopes := `` pipeLimit := limit @@ -342,10 +351,11 @@ func searchTagValues(ctx context.Context, cp *tracecommon.CommonParams, traceQLS filterQuery = defaultNoopFilter } - // exclude queries that contains pipe(s). - if filterQuery.HasPipe() { + // exclude queries that contain non-hint pipes (select/by/with are OK to ignore). + if filterQuery.HasNonHintPipe() { return nil, fmt.Errorf("cannot use query pipes in search tag values API: %s", traceQLStr) } + filterQuery.StripHintPipes() qStr := fmt.Sprintf(`%s | fields %q | field_values %q | fields %q`, filterQuery.String(), tagName, tagName, tagName, @@ -393,13 +403,16 @@ func searchTraces(ctx context.Context, cp *tracecommon.CommonParams, traceQLStr // transform traceQL into LogsQL as filter. It should contain filter only without any pipe. filterQuery, err := traceql.ParseQuery(traceQLStr) if err != nil { - return nil, err + // Return empty results for malformed queries (e.g., "duration > " with no value) + // rather than erroring, since Grafana may send incomplete filters during user editing. + return nil, nil } - // exclude queries that contains pipe(s). - if filterQuery.HasPipe() { + // exclude queries that contain non-hint pipes (select/by/with are OK to ignore). + if filterQuery.HasNonHintPipe() { return nil, fmt.Errorf("cannot use query pipes in search tag values API: %s", traceQLStr) } + filterQuery.StripHintPipes() _, rows, err := GetTraceList(ctx, cp, filterQuery, start, end, limit) if err != nil { diff --git a/lib/protoparser/opentelemetry/pb/internal_fields.go b/lib/protoparser/opentelemetry/pb/internal_fields.go index bfd811037..6dfa3d582 100644 --- a/lib/protoparser/opentelemetry/pb/internal_fields.go +++ b/lib/protoparser/opentelemetry/pb/internal_fields.go @@ -11,6 +11,7 @@ const ( TraceIDIndexStartTimeFieldName = "start_time" TraceIDIndexEndTimeFieldName = "end_time" TraceIDIndexDuration = "duration" + TraceIDIndexHasRootSpan = "has_root_span" TraceIDIndexPartitionCount = uint64(1024) ) diff --git a/lib/traceql/filter_general.go b/lib/traceql/filter_general.go index ee3375c47..e5dbb099b 100644 --- a/lib/traceql/filter_general.go +++ b/lib/traceql/filter_general.go @@ -13,13 +13,53 @@ type filterCommon struct { value string } +// statusValueMap maps TraceQL status names to OTEL StatusCode numeric values. +var statusValueMap = map[string]string{ + "unset": "0", + "ok": "1", + "error": "2", +} + +// statusCodeMap is the reverse of statusValueMap. +var statusCodeMap = func() map[string]string { + m := make(map[string]string, len(statusValueMap)) + for name, code := range statusValueMap { + m[code] = name + } + return m +}() + +// StatusCodeToName converts a numeric OTEL StatusCode ("2") to its TraceQL name ("error"). +// Returns the input unchanged if not a known status code. +func StatusCodeToName(code string) string { + if name, ok := statusCodeMap[code]; ok { + return name + } + return code +} + func (fc *filterCommon) String() string { // traceDuration must be treated as pipe if fc.fieldName == "traceDuration" { return "*" } + // nestedSetParent<0 is Tempo's way to select root spans. + // Query the trace ID index stream which has has_root_span field — much faster + // than scanning parent_span_id across all spans. + if fc.fieldName == "nestedSetParent" && fc.op == "<" && fc.value == "0" { + return otelpb.ParentSpanIDField + `:=""` + } + v := fc.value + + // Map status names (error, ok, unset) to numeric OTEL StatusCode values. + if fc.fieldName == "status" { + if numeric, ok := statusValueMap[strings.ToLower(v)]; ok { + v = numeric + } + } + if duration, ok := tryParseDuration(v); ok { v = strconv.FormatInt(duration, 10) } @@ -27,23 +67,57 @@ func (fc *filterCommon) String() string { } func (fc *filterCommon) tagToVTField() string { - if strings.HasPrefix(fc.fieldName, "resource.") { - return otelpb.ResourceAttrPrefix + fc.fieldName[len("resource."):] - } else if strings.HasPrefix(fc.fieldName, "span.") { - return otelpb.SpanAttrPrefixField + fc.fieldName[len("span."):] - } else if strings.HasPrefix(fc.fieldName, "event.") { - return otelpb.EventPrefix + otelpb.EventAttrPrefix + fc.fieldName[len("event."):] - } else if strings.HasPrefix(fc.fieldName, "link.") { - return otelpb.LinkPrefix + otelpb.LinkAttrPrefix + fc.fieldName[len("link."):] - } else if strings.HasPrefix(fc.fieldName, "instrumentation.") { - return otelpb.InstrumentationScopeAttrPrefix + fc.fieldName[len("instrumentation."):] - } else if fc.fieldName == "status" { + return TraceQLFieldToVTField(fc.fieldName) +} + +// TraceQLFieldToVTField converts a TraceQL field name to a VictoriaTraces internal field name. +// e.g., "resource.service.name" -> "resource_attr:service.name" +// +// "span.http.status_code" -> "span_attr:http.status_code" +// "status" -> "status_code" +func TraceQLFieldToVTField(fieldName string) string { + if strings.HasPrefix(fieldName, "resource.") { + return otelpb.ResourceAttrPrefix + fieldName[len("resource."):] + } else if strings.HasPrefix(fieldName, "span.") { + return otelpb.SpanAttrPrefixField + fieldName[len("span."):] + } else if strings.HasPrefix(fieldName, "event.") { + return otelpb.EventPrefix + otelpb.EventAttrPrefix + fieldName[len("event."):] + } else if strings.HasPrefix(fieldName, "link.") { + return otelpb.LinkPrefix + otelpb.LinkAttrPrefix + fieldName[len("link."):] + } else if strings.HasPrefix(fieldName, "instrumentation.") { + return otelpb.InstrumentationScopeAttrPrefix + fieldName[len("instrumentation."):] + } else if fieldName == "status" { return otelpb.StatusCodeField - } else if fc.fieldName == "service.name" || fc.fieldName == ".service.name" { + } else if fieldName == "service.name" || fieldName == ".service.name" { return otelpb.ResourceAttrServiceName } - return fc.fieldName + return fieldName +} + +// VTFieldToTraceQL converts a VictoriaTraces internal field name back to a TraceQL field name. +// e.g., "resource_attr:service.name" -> "resource.service.name" +// +// "span_attr:http.status_code" -> "span.http.status_code" +// "status_code" -> "status" +func VTFieldToTraceQL(fieldName string) string { + if strings.HasPrefix(fieldName, otelpb.ResourceAttrPrefix) { + return "resource." + fieldName[len(otelpb.ResourceAttrPrefix):] + } else if strings.HasPrefix(fieldName, otelpb.SpanAttrPrefixField) { + return "span." + fieldName[len(otelpb.SpanAttrPrefixField):] + } else if strings.HasPrefix(fieldName, otelpb.EventPrefix+otelpb.EventAttrPrefix) { + return "event." + fieldName[len(otelpb.EventPrefix+otelpb.EventAttrPrefix):] + } else if strings.HasPrefix(fieldName, otelpb.LinkPrefix+otelpb.LinkAttrPrefix) { + return "link." + fieldName[len(otelpb.LinkPrefix+otelpb.LinkAttrPrefix):] + } else if strings.HasPrefix(fieldName, otelpb.InstrumentationScopeAttrPrefix) { + return "instrumentation." + fieldName[len(otelpb.InstrumentationScopeAttrPrefix):] + } else if fieldName == otelpb.StatusCodeField { + return "status" + } else if fieldName == otelpb.ResourceAttrServiceName { + return "resource.service.name" + } + + return fieldName } func quoteFieldNameIfNeeded(s string) string { diff --git a/lib/traceql/parser.go b/lib/traceql/parser.go index 3e093532a..bd3f3ba1f 100644 --- a/lib/traceql/parser.go +++ b/lib/traceql/parser.go @@ -51,6 +51,125 @@ func (q *Query) HasPipe() bool { return len(q.pipes) > 0 } +// HasNonHintPipe returns true if the query contains pipes other than display hints +// (select, by, with). These hint pipes are safe to ignore in search/tag queries. +func (q *Query) HasNonHintPipe() bool { + for _, p := range q.pipes { + switch p.(type) { + case *pipeSelect, *pipeBy, *pipeWith: + continue + default: + return true + } + } + return false +} + +// StripHintPipes removes display-hint pipes (select, by, with) from the query +// so they don't leak into LogsQL where they have different semantics. +func (q *Query) StripHintPipes() { + var kept []pipe + for _, p := range q.pipes { + switch p.(type) { + case *pipeSelect, *pipeBy, *pipeWith: + continue + default: + kept = append(kept, p) + } + } + q.pipes = kept +} + +// IsMetricsQuery returns true if this query contains metrics pipes (rate, *_over_time). +func (q *Query) IsMetricsQuery() bool { + for _, p := range q.pipes { + switch p.(type) { + case *pipeRate, *pipeOverTime, *pipeQuantileOverTime, *pipeCompare: + return true + } + // Also check inside pipeAggregator wrappers (e.g. rate() > 5). + if pa, ok := p.(*pipeAggregator); ok { + switch pa.aggregator.(type) { + case *pipeRate, *pipeOverTime, *pipeQuantileOverTime, *pipeCompare: + return true + } + } + } + return false +} + +// CompareParams holds the parsed parameters of a compare() pipe. +type CompareParams struct { + Filter string // LogsQL filter for the selection subset + TopN int // max values per attribute (default 10) + StartNs int64 // selection window start nanoseconds (0 = use query range) + EndNs int64 // selection window end nanoseconds (0 = use query range) +} + +// MetricsComponents extracts the components of a metrics query for translation. +// +// Returns an error if the query is not a valid metrics query. +func (q *Query) MetricsComponents() (funcName string, fieldName string, quantile string, compare *CompareParams, byFields []string, err error) { + if !q.IsMetricsQuery() { + return "", "", "", nil, nil, fmt.Errorf("query does not contain a metrics function") + } + + extractCompare := func(pc *pipeCompare) { + funcName = "compare" + compare = &CompareParams{ + Filter: pc.SelectionFilterString(), + TopN: pc.topN, + StartNs: pc.startNs, + EndNs: pc.endNs, + } + if compare.TopN <= 0 { + compare.TopN = 10 + } + } + + for _, p := range q.pipes { + switch pt := p.(type) { + case *pipeRate: + funcName = "rate" + case *pipeCompare: + extractCompare(pt) + case *pipeOverTime: + funcName = pt.funcName + fieldName = pt.fieldName + case *pipeQuantileOverTime: + funcName = "quantile_over_time" + fieldName = pt.fieldName + quantile = pt.quantile + case *pipeBy: + byFields = pt.fieldFilters + case *pipeAggregator: + switch inner := pt.aggregator.(type) { + case *pipeRate: + funcName = "rate" + case *pipeCompare: + extractCompare(inner) + case *pipeOverTime: + funcName = inner.funcName + fieldName = inner.fieldName + case *pipeQuantileOverTime: + funcName = "quantile_over_time" + fieldName = inner.fieldName + quantile = inner.quantile + } + } + } + + if funcName == "" { + return "", "", "", nil, nil, fmt.Errorf("no metrics function found in query") + } + return funcName, fieldName, quantile, compare, byFields, nil +} + +// Filter returns the filter expression string (in LogsQL format). +func (q *Query) Filter() string { + return q.f.String() +} + // ParseQuery parses s. func ParseQuery(s string) (*Query, error) { timestamp := time.Now().UnixNano() @@ -181,6 +300,11 @@ func parsePipes(lex *lexer) ([]pipe, error) { case lex.isKeyword(")", ""): return pipes, nil default: + // Allow pipe keywords (like "by") without an explicit "|" separator. + // This supports Tempo-style syntax: rate() by(resource.service.name) + if isPipeName(strings.ToLower(lex.token)) { + continue + } return nil, fmt.Errorf("unexpected token after [%s]: %q; expecting '|' or ')'", pipes[len(pipes)-1], lex.token) } } diff --git a/lib/traceql/pipe.go b/lib/traceql/pipe.go index 99e93d257..800842cb9 100644 --- a/lib/traceql/pipe.go +++ b/lib/traceql/pipe.go @@ -50,6 +50,22 @@ func initPipeParsers() { "min": parsePipeAggregatorOnField, "sum": parsePipeAggregatorOnField, + // Metrics aggregators + "rate": parsePipeRate, + "count_over_time": parsePipeOverTime, + "min_over_time": parsePipeOverTime, + "max_over_time": parsePipeOverTime, + "avg_over_time": parsePipeOverTime, + "sum_over_time": parsePipeOverTime, + "histogram_over_time": parsePipeOverTime, + "quantile_over_time": parsePipeQuantileOverTime, + + // Complex metrics (fallback to rate) + "compare": parsePipeCompare, + + // Query hints (silently ignored) + "with": parsePipeWith, + // Selection "select": parsePipeSelect, diff --git a/lib/traceql/pipe_metrics.go b/lib/traceql/pipe_metrics.go new file mode 100644 index 000000000..fb482d4e3 --- /dev/null +++ b/lib/traceql/pipe_metrics.go @@ -0,0 +1,302 @@ +package traceql + +import ( + "fmt" + "strconv" + "strings" +) + +// pipeRate represents `rate()` — count of matching spans per second per time bucket. +type pipeRate struct{} + +func (pr *pipeRate) String() string { + return "rate()" +} + +func parsePipeRate(lex *lexer) (pipe, error) { + if !lex.isKeyword("rate") { + return nil, fmt.Errorf("expecting 'rate'; got %q", lex.token) + } + lex.nextToken() + if !lex.isKeyword("(") { + return nil, fmt.Errorf("expecting '('; got %q", lex.token) + } + lex.nextToken() + if !lex.isKeyword(")") { + return nil, fmt.Errorf("expecting ')' at the end of rate pipe; got %q", lex.token) + } + lex.nextToken() + + pa, err := parsePipeAggregator(lex) + if err != nil { + return nil, err + } + if pa != nil { + pa.aggregator = &pipeRate{} + return pa, nil + } + return &pipeRate{}, nil +} + +// pipeOverTime represents `_over_time()` metrics functions. +// +// Supported functions: count_over_time(), min_over_time(field), max_over_time(field), +// avg_over_time(field), sum_over_time(field), histogram_over_time(field). +type pipeOverTime struct { + funcName string + fieldName string // empty for count_over_time +} + +func (pot *pipeOverTime) String() string { + if pot.fieldName == "" { + return pot.funcName + "()" + } + return pot.funcName + "(" + quoteFieldFilterIfNeeded(pot.fieldName) + ")" +} + +func parsePipeOverTime(lex *lexer) (pipe, error) { + if !lex.isKeyword("count_over_time", "min_over_time", "max_over_time", "avg_over_time", "sum_over_time", "histogram_over_time") { + return nil, fmt.Errorf("expecting '*_over_time'; got %q", lex.token) + } + funcName := lex.token + + lex.nextToken() + if !lex.isKeyword("(") { + return nil, fmt.Errorf("expecting '('; got %q", lex.token) + } + lex.nextToken() + + var fieldName string + if !lex.isKeyword(")") { + // Parse the field argument. + f, err := lex.nextCompoundToken() + if err != nil { + return nil, err + } + fieldName = f + } + + // Validate: count_over_time takes no field, others require one. + if funcName == "count_over_time" && fieldName != "" { + return nil, fmt.Errorf("count_over_time() does not accept a field argument") + } + if funcName != "count_over_time" && fieldName == "" { + return nil, fmt.Errorf("%s() requires a field argument", funcName) + } + + if !lex.isKeyword(")") { + return nil, fmt.Errorf("expecting ')' at the end of %s pipe; got %q", funcName, lex.token) + } + lex.nextToken() + + pot := &pipeOverTime{ + funcName: funcName, + fieldName: fieldName, + } + + pa, err := parsePipeAggregator(lex) + if err != nil { + return nil, err + } + if pa != nil { + pa.aggregator = pot + return pa, nil + } + return pot, nil +} + +// pipeQuantileOverTime represents `quantile_over_time(field, quantile)`. +type pipeQuantileOverTime struct { + fieldName string + quantile string // kept as string to preserve original precision +} + +func (pq *pipeQuantileOverTime) String() string { + return "quantile_over_time(" + quoteFieldFilterIfNeeded(pq.fieldName) + ", " + pq.quantile + ")" +} + +func parsePipeQuantileOverTime(lex *lexer) (pipe, error) { + if !lex.isKeyword("quantile_over_time") { + return nil, fmt.Errorf("expecting 'quantile_over_time'; got %q", lex.token) + } + lex.nextToken() + if !lex.isKeyword("(") { + return nil, fmt.Errorf("expecting '('; got %q", lex.token) + } + lex.nextToken() + + // Parse field name. + fieldName, err := lex.nextCompoundToken() + if err != nil { + return nil, err + } + if fieldName == "" { + return nil, fmt.Errorf("quantile_over_time() requires a field argument") + } + + if !lex.isKeyword(",") { + return nil, fmt.Errorf("expecting ',' after field in quantile_over_time; got %q", lex.token) + } + lex.nextToken() + + // Parse quantile value(s). For now support a single quantile. + quantile, err := lex.nextCompoundToken() + if err != nil { + return nil, err + } + + if !lex.isKeyword(")") { + return nil, fmt.Errorf("expecting ')' at the end of quantile_over_time; got %q", lex.token) + } + lex.nextToken() + + pq := &pipeQuantileOverTime{ + fieldName: fieldName, + quantile: quantile, + } + + pa, err := parsePipeAggregator(lex) + if err != nil { + return nil, err + } + if pa != nil { + pa.aggregator = pq + return pa, nil + } + return pq, nil +} + +// pipeCompare represents `compare({selectionFilter}, topN, startNs, endNs)`. +// It auto-discovers attributes and computes per-attribute-value counts for baseline vs selection. +type pipeCompare struct { + selectionFilter filter // the inner {filter} for the selection subset + topN int // max values per attribute; 0 means default (10) + startNs int64 // selection window start in nanoseconds; 0 = use query range + endNs int64 // selection window end in nanoseconds; 0 = use query range +} + +func (pc *pipeCompare) String() string { + return "compare()" +} + +// SelectionFilterString returns the LogsQL representation of the inner selection filter. +func (pc *pipeCompare) SelectionFilterString() string { + if pc.selectionFilter == nil { + return "" + } + return pc.selectionFilter.String() +} + +func parsePipeCompare(lex *lexer) (pipe, error) { + if !lex.isKeyword("compare") { + return nil, fmt.Errorf("expecting 'compare'; got %q", lex.token) + } + lex.nextToken() + if !lex.isKeyword("(") { + return nil, fmt.Errorf("expecting '('; got %q", lex.token) + } + lex.nextToken() + + // Parse the inner selection filter — capture raw tokens until we hit "," or ")" at depth 0. + var filterTokens []string + depth := 0 + for { + if lex.isKeyword("") { + return nil, fmt.Errorf("unexpected end of query inside compare()") + } + if lex.isKeyword(",") && depth == 0 { + break + } + if lex.isKeyword(")") && depth == 0 { + break + } + if lex.isKeyword("{") { + depth++ + } else if lex.isKeyword("}") { + depth-- + } + filterTokens = append(filterTokens, lex.rawToken) + lex.nextToken() + } + + // Parse the captured filter text. + filterText := strings.Join(filterTokens, " ") + var f filter + if filterText != "" { + parsedQ, parseErr := ParseQuery(filterText) + if parseErr == nil { + f = parsedQ.f + } + } + + // Parse optional comma-separated arguments: topN, startNs, endNs. + var args []string + for lex.isKeyword(",") { + lex.nextToken() + arg, _ := lex.nextCompoundToken() + args = append(args, arg) + } + + pc := &pipeCompare{selectionFilter: f} + + if len(args) >= 1 { + if n, err := strconv.Atoi(args[0]); err == nil { + pc.topN = n + } + } + if len(args) >= 2 { + if ns, err := strconv.ParseInt(args[1], 10, 64); err == nil { + pc.startNs = ns + } + } + if len(args) >= 3 { + if ns, err := strconv.ParseInt(args[2], 10, 64); err == nil { + pc.endNs = ns + } + } + + if !lex.isKeyword(")") { + return nil, fmt.Errorf("expecting ')' at the end of compare; got %q", lex.token) + } + lex.nextToken() + + return pc, nil +} + +// pipeWith represents `with(key=value, ...)` — query hints (sampling, exemplars). +// These are silently consumed and ignored by VictoriaTraces. +type pipeWith struct{} + +func (pw *pipeWith) String() string { + return "with()" +} + +func parsePipeWith(lex *lexer) (pipe, error) { + if !lex.isKeyword("with") { + return nil, fmt.Errorf("expecting 'with'; got %q", lex.token) + } + lex.nextToken() + if !lex.isKeyword("(") { + return nil, fmt.Errorf("expecting '('; got %q", lex.token) + } + lex.nextToken() + + // Consume everything inside with(...) — key=value pairs separated by commas. + depth := 1 + for depth > 0 { + if lex.isKeyword("") { + return nil, fmt.Errorf("unexpected end of query inside with()") + } + if lex.isKeyword("(") { + depth++ + } else if lex.isKeyword(")") { + depth-- + } + if depth > 0 { + lex.nextToken() + } + } + lex.nextToken() // consume the final ")" + + return &pipeWith{}, nil +} diff --git a/lib/traceql/pipe_metrics_test.go b/lib/traceql/pipe_metrics_test.go new file mode 100644 index 000000000..566ad06fa --- /dev/null +++ b/lib/traceql/pipe_metrics_test.go @@ -0,0 +1,224 @@ +package traceql + +import ( + "testing" +) + +func TestParsePipeRate(t *testing.T) { + f := func(input, expected string) { + t.Helper() + q, err := ParseQuery(input) + if err != nil { + t.Fatalf("cannot parse %q: %s", input, err) + } + if !q.IsMetricsQuery() { + t.Fatalf("expected metrics query for %q", input) + } + funcName, fieldName, _, _, _, err := q.MetricsComponents() //nolint:dogsled + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if funcName != expected { + t.Fatalf("unexpected funcName; got %q; want %q", funcName, expected) + } + if fieldName != "" { + t.Fatalf("unexpected fieldName; got %q; want empty", fieldName) + } + } + + f(`{} | rate()`, "rate") + f(`{resource.service.name = "frontend"} | rate()`, "rate") +} + +func TestParsePipeRateWithComparison(t *testing.T) { + q, err := ParseQuery(`{} | rate() > 5`) + if err != nil { + t.Fatalf("cannot parse: %s", err) + } + if !q.IsMetricsQuery() { + t.Fatal("expected metrics query") + } + funcName, _, _, _, _, err := q.MetricsComponents() + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if funcName != "rate" { + t.Fatalf("unexpected funcName; got %q; want %q", funcName, "rate") + } +} + +func TestParsePipeOverTime(t *testing.T) { + f := func(input, expectedFunc, expectedField string) { + t.Helper() + q, err := ParseQuery(input) + if err != nil { + t.Fatalf("cannot parse %q: %s", input, err) + } + if !q.IsMetricsQuery() { + t.Fatalf("expected metrics query for %q", input) + } + funcName, fieldName, _, _, _, err := q.MetricsComponents() //nolint:dogsled + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if funcName != expectedFunc { + t.Fatalf("unexpected funcName; got %q; want %q", funcName, expectedFunc) + } + if fieldName != expectedField { + t.Fatalf("unexpected fieldName; got %q; want %q", fieldName, expectedField) + } + } + + // count_over_time takes no field + f(`{} | count_over_time()`, "count_over_time", "") + + // *_over_time with field + f(`{} | min_over_time(duration)`, "min_over_time", "duration") + f(`{} | max_over_time(duration)`, "max_over_time", "duration") + f(`{} | avg_over_time(duration)`, "avg_over_time", "duration") + f(`{} | sum_over_time(duration)`, "sum_over_time", "duration") + + // With filter + f(`{resource.service.name = "api"} | avg_over_time(duration)`, "avg_over_time", "duration") + + // With dotted field name + f(`{} | sum_over_time(span.http.response_content_length)`, "sum_over_time", "span.http.response_content_length") +} + +func TestParsePipeOverTimeWithBy(t *testing.T) { + // With explicit | separator + q, err := ParseQuery(`{} | rate() | by(resource.service.name)`) + if err != nil { + t.Fatalf("cannot parse: %s", err) + } + _, _, _, _, byFields, err := q.MetricsComponents() + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if len(byFields) != 1 || byFields[0] != "resource.service.name" { + t.Fatalf("unexpected byFields; got %v; want [resource.service.name]", byFields) + } + + // Tempo-style: by() without | separator + q, err = ParseQuery(`{} | rate() by(resource.service.name)`) + if err != nil { + t.Fatalf("cannot parse Tempo-style by(): %s", err) + } + _, _, _, _, byFields, err = q.MetricsComponents() + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if len(byFields) != 1 || byFields[0] != "resource.service.name" { + t.Fatalf("unexpected byFields (Tempo-style); got %v; want [resource.service.name]", byFields) + } +} + +func TestParsePipeOverTimeWithMultipleByFields(t *testing.T) { + q, err := ParseQuery(`{} | count_over_time() | by(resource.service.name, span.http.method)`) + if err != nil { + t.Fatalf("cannot parse: %s", err) + } + _, _, _, _, byFields, err := q.MetricsComponents() + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if len(byFields) != 2 { + t.Fatalf("unexpected byFields count; got %d; want 2", len(byFields)) + } + if byFields[0] != "resource.service.name" { + t.Fatalf("unexpected byFields[0]; got %q; want %q", byFields[0], "resource.service.name") + } + if byFields[1] != "span.http.method" { + t.Fatalf("unexpected byFields[1]; got %q; want %q", byFields[1], "span.http.method") + } +} + +func TestParsePipeOverTimeErrors(t *testing.T) { + // count_over_time should not accept a field + _, err := ParseQuery(`{} | count_over_time(duration)`) + if err == nil { + t.Fatal("expected error for count_over_time with field argument") + } + + // avg_over_time requires a field + _, err = ParseQuery(`{} | avg_over_time()`) + if err == nil { + t.Fatal("expected error for avg_over_time without field argument") + } + + // missing closing paren + _, err = ParseQuery(`{} | rate(`) + if err == nil { + t.Fatal("expected error for rate with missing closing paren") + } +} + +func TestIsMetricsQueryFalse(t *testing.T) { + // Regular queries should not be metrics queries. + q, err := ParseQuery(`{resource.service.name = "frontend"}`) + if err != nil { + t.Fatalf("cannot parse: %s", err) + } + if q.IsMetricsQuery() { + t.Fatal("expected non-metrics query") + } + + // Query with count pipe is not a metrics query. + q, err = ParseQuery(`{} | count() > 5`) + if err != nil { + t.Fatalf("cannot parse: %s", err) + } + if q.IsMetricsQuery() { + t.Fatal("expected non-metrics query for count()") + } +} + +func TestTraceQLFieldToVTField(t *testing.T) { + f := func(input, expected string) { + t.Helper() + got := TraceQLFieldToVTField(input) + if got != expected { + t.Fatalf("TraceQLFieldToVTField(%q); got %q; want %q", input, got, expected) + } + } + + f("resource.service.name", "resource_attr:service.name") + f("span.http.status_code", "span_attr:http.status_code") + f("status", "status_code") + f("service.name", "resource_attr:service.name") + f("duration", "duration") + f("name", "name") +} + +func TestVTFieldToTraceQL(t *testing.T) { + f := func(input, expected string) { + t.Helper() + got := VTFieldToTraceQL(input) + if got != expected { + t.Fatalf("VTFieldToTraceQL(%q); got %q; want %q", input, got, expected) + } + } + + f("resource_attr:service.name", "resource.service.name") + f("span_attr:http.status_code", "span.http.status_code") + f("status_code", "status") + f("duration", "duration") + f("name", "name") +} + +func TestFieldMappingRoundTrip(t *testing.T) { + fields := []string{ + "resource.service.name", + "span.http.status_code", + "status", + "duration", + "name", + } + for _, field := range fields { + vt := TraceQLFieldToVTField(field) + back := VTFieldToTraceQL(vt) + if back != field { + t.Fatalf("round-trip failed for %q: TraceQLFieldToVTField -> %q -> VTFieldToTraceQL -> %q", field, vt, back) + } + } +}