Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions app/vtinsert/insertutil/index_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type indexEntry struct {
tenantID logstorage.TenantID
startTimeNano int64
endTimeNano int64
hasRootSpan bool
}

type indexWorker struct {
Expand All @@ -48,21 +49,21 @@ var (

// pushIndexToQueue organize index data (from LogMessageProcessor interface or InsertRowProcessor interface)
// and push it to the queue.
func pushIndexToQueue(tenantID logstorage.TenantID, traceID string, startTime, endTime int64) bool {
func pushIndexToQueue(tenantID logstorage.TenantID, traceID string, startTime, endTime int64, isRootSpan bool) bool {
select {
case <-stopCh:
// during stop, no data should be pushed to the queue anymore.
return false
default:
mustPushIndex(tenantID, traceID, startTime, endTime)
mustPushIndex(tenantID, traceID, startTime, endTime, isRootSpan)
}
return true
}

// mustPushIndex compose an (or update an existing) indexEntry with tenantID, startTime and endTime for a trace,
// and put it to the traceIDIndex map.
// The indexEntry should indicate the real min(startTime) and max(endTime) of a trace, and be flushed to disk later.
func mustPushIndex(tenantID logstorage.TenantID, traceID string, startTime, endTime int64) {
func mustPushIndex(tenantID logstorage.TenantID, traceID string, startTime, endTime int64, isRootSpan bool) {
tb := [32]byte{}
copy(tb[:], traceID)

Expand All @@ -78,6 +79,7 @@ func mustPushIndex(tenantID logstorage.TenantID, traceID string, startTime, endT
if ok {
idxEntry.startTimeNano = min(startTime, idxEntry.startTimeNano)
idxEntry.endTimeNano = max(endTime, idxEntry.endTimeNano)
idxEntry.hasRootSpan = idxEntry.hasRootSpan || isRootSpan
worker.traceIDIndexMapCur[tb] = idxEntry
return
}
Expand All @@ -86,6 +88,7 @@ func mustPushIndex(tenantID logstorage.TenantID, traceID string, startTime, endT
if ok {
idxEntry.startTimeNano = min(startTime, idxEntry.startTimeNano)
idxEntry.endTimeNano = max(endTime, idxEntry.endTimeNano)
idxEntry.hasRootSpan = idxEntry.hasRootSpan || isRootSpan
worker.traceIDIndexMapPrev[tb] = idxEntry
return
}
Expand All @@ -95,6 +98,7 @@ func mustPushIndex(tenantID logstorage.TenantID, traceID string, startTime, endT
idxEntry.tenantID = tenantID
idxEntry.startTimeNano = startTime
idxEntry.endTimeNano = endTime
idxEntry.hasRootSpan = isRootSpan

worker.traceIDIndexMapCur[tb] = idxEntry
}
Expand Down Expand Up @@ -179,6 +183,10 @@ func (w *indexWorker) flushIndexInMap(tb [32]byte, idxEntry indexEntry) bool {

startTimestamp := idxEntry.startTimeNano
endTimestamp := idxEntry.endTimeNano
hasRootSpan := "0"
if idxEntry.hasRootSpan {
hasRootSpan = "1"
}
lmp.AddRow(startTimestamp,
// fields
[]logstorage.Field{
Expand All @@ -188,6 +196,7 @@ func (w *indexWorker) flushIndexInMap(tb [32]byte, idxEntry indexEntry) bool {
{Name: otelpb.TraceIDIndexStartTimeFieldName, Value: strconv.FormatInt(startTimestamp, 10)},
{Name: otelpb.TraceIDIndexEndTimeFieldName, Value: strconv.FormatInt(endTimestamp, 10)},
{Name: otelpb.TraceIDIndexDuration, Value: strconv.FormatInt(endTimestamp-startTimestamp, 10)},
{Name: otelpb.TraceIDIndexHasRootSpan, Value: hasRootSpan},
},
1,
)
Expand Down
54 changes: 28 additions & 26 deletions app/vtinsert/insertutil/trace_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,11 @@ func (tsp *traceSpanProcessor) MustClose() {
// Each item in the queue will be popped after certain interval, and carries the min(startTimeNano), max(endTimeNano) of this trace ID.
func (tsp *traceSpanProcessor) pushTraceToIndexQueue(tenant logstorage.TenantID, fields []logstorage.Field) bool {
var (
traceID string
startTime, endTime int64
err error
traceID string
parentSpanID string
startTime int64
endTime int64
err error
)

i := len(fields) - 1
Expand All @@ -94,26 +96,25 @@ func (tsp *traceSpanProcessor) pushTraceToIndexQueue(tenant logstorage.TenantID,
return false
}

// find endTimeNano of the span in reverse order, it should be right before trace ID field.
// find endTimeNano, startTimeNano, and parentSpanID of the span in reverse order.
for i = i - 1; i >= 0; i-- {
if fields[i].Name == otelpb.EndTimeUnixNanoField {
switch fields[i].Name {
case otelpb.EndTimeUnixNanoField:
endTime, err = strconv.ParseInt(fields[i].Value, 10, 64)
if err != nil {
logger.Errorf("cannot parse endTime %s for traceID %q: %v", fields[i].Value, traceID, err)
return false
}
break
}
}

// find startTimeNano of the span in reverse order, it should be right before endTimeNano field.
for i = i - 1; i >= 0; i-- {
if fields[i].Name == otelpb.StartTimeUnixNanoField {
case otelpb.StartTimeUnixNanoField:
startTime, err = strconv.ParseInt(fields[i].Value, 10, 64)
if err != nil {
logger.Errorf("cannot parse startTime %s for traceID %q: %v", fields[i].Value, traceID, err)
return false
}
case otelpb.ParentSpanIDField:
parentSpanID = fields[i].Value
}
if startTime != 0 && endTime != 0 {
break
}
}
Expand All @@ -126,7 +127,7 @@ func (tsp *traceSpanProcessor) pushTraceToIndexQueue(tenant logstorage.TenantID,
endTime = time.Now().UnixNano()
}

return pushIndexToQueue(tenant, traceID, startTime, endTime)
return pushIndexToQueue(tenant, traceID, startTime, endTime, parentSpanID == "")
}

// The following methods are for native data ingestion protocol. They must be called from `internalinsert`.
Expand All @@ -147,9 +148,11 @@ func (tsp *traceSpanProcessor) AddInsertRow(r *logstorage.InsertRow) {
// Each item in the queue will be popped after certain interval, and carries the min(startTimeNano), max(endTimeNano) of this trace ID.
func (tsp *traceSpanProcessor) pushNativeRowToIndexQueue(r *logstorage.InsertRow) bool {
var (
traceID string
startTime, endTime int64
err error
traceID string
parentSpanID string
startTime int64
endTime int64
err error
)

i := len(r.Fields) - 1
Expand All @@ -166,26 +169,25 @@ func (tsp *traceSpanProcessor) pushNativeRowToIndexQueue(r *logstorage.InsertRow
return false
}

// find endTimeNano of the span in reverse order, it should be right before trace ID field.
// find endTimeNano, startTimeNano, and parentSpanID in reverse order.
for i = i - 1; i >= 0; i-- {
if r.Fields[i].Name == otelpb.EndTimeUnixNanoField {
switch r.Fields[i].Name {
case otelpb.EndTimeUnixNanoField:
endTime, err = strconv.ParseInt(r.Fields[i].Value, 10, 64)
if err != nil {
logger.Errorf("cannot parse endTime %s for traceID %q: %v", r.Fields[i].Value, traceID, err)
return false
}
break
}
}

// find startTimeNano of the span in reverse order, it should be right before endTimeNano field.
for i = i - 1; i >= 0; i-- {
if r.Fields[i].Name == otelpb.StartTimeUnixNanoField {
case otelpb.StartTimeUnixNanoField:
startTime, err = strconv.ParseInt(r.Fields[i].Value, 10, 64)
if err != nil {
logger.Errorf("cannot parse startTime %s for traceID %q: %v", r.Fields[i].Value, traceID, err)
return false
}
case otelpb.ParentSpanIDField:
parentSpanID = r.Fields[i].Value
}
if startTime != 0 && endTime != 0 {
break
}
}
Expand All @@ -198,5 +200,5 @@ func (tsp *traceSpanProcessor) pushNativeRowToIndexQueue(r *logstorage.InsertRow
endTime = time.Now().UnixNano()
}

return pushIndexToQueue(r.TenantID, traceID, startTime, endTime)
return pushIndexToQueue(r.TenantID, traceID, startTime, endTime, parentSpanID == "")
}
Loading