Reindexing rule providers with cascading interval based reindexing#18939
Reindexing rule providers with cascading interval based reindexing#18939capistrant wants to merge 86 commits intoapache:masterfrom
Conversation
Improvements and bugfixes Fix sompaction status after rebasing Fix missing import after rebase fix checkstyle issues fill out javadocs address claude code review comments Add isReady concept to compaction rule provider and gate task creation on provider being ready Fix an issue in AbstractRuleProvider when it comes to variable length periods like month and year Implement a composing rule provider for chaining multiple rule providers
Using 1 row and creating 0 row segments makes the test fail for native compaction runner. I cannot reproduce in docker to figure out how the test is misconfigured
… issue with range dim and all rows filtered out
| return new Builder() | ||
| .forDataSource(this.dataSource) | ||
| .withTaskPriority(this.taskPriority) | ||
| .withInputSegmentSizeBytes(this.inputSegmentSizeBytes) | ||
| .withMaxRowsPerSegment(this.maxRowsPerSegment) |
Check notice
Code scanning / CodeQL
Deprecated method or constructor invocation Note
server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTest.java
Fixed
Show fixed
Hide fixed
server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTest.java
Fixed
Show fixed
Hide fixed
server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTest.java
Fixed
Show fixed
Hide fixed
… is going to be a bad time
...service/src/test/java/org/apache/druid/indexing/compact/CascadingReindexingTemplateTest.java
Fixed
Show fixed
Hide fixed
...service/src/test/java/org/apache/druid/indexing/compact/CascadingReindexingTemplateTest.java
Fixed
Show fixed
Hide fixed
...service/src/test/java/org/apache/druid/indexing/compact/CascadingReindexingTemplateTest.java
Fixed
Show fixed
Hide fixed
After toying around with a coding agent I feel like I was able to drum up a surprisingly nice UI in the supervisor view (for cascading reindex type only). The code is currently in this PR, but I'm down to move it to a follow in terms of sanity for review. |
…at are deleted showing up
|
@clintropolis TY for the review. I made some changes based off our conversation and off some findings during further testing. rough summary is:
not directly related to your review:
|
There was a problem hiding this comment.
nit:
| // Virtual Collumns on nested data is only supported with MSQ compaction engine right now. | |
| // Virtual Columns on nested data is only supported with MSQ compaction engine right now. |
There was a problem hiding this comment.
nit: i think can leave out the list part and use the other 'create'
There was a problem hiding this comment.
should these use InvalidInput.conditionalException to throw a proper DruidException to signal user error? (same comments for other validations)
There was a problem hiding this comment.
nit: variable name no longer matches
There was a problem hiding this comment.
i think if we modified this to
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
we could modify the constructor to initialize to VirtualColumns.EMPTY and save ourselves some null checks and not risk messing up fingerprinting stuff for things without virtual columns defined
There was a problem hiding this comment.
thanks for the tip. I was definitely confused about what to do here to make it so everything that is already fingerprinted didn't end up being re-processed
There was a problem hiding this comment.
I went down a little bit of a rabbit hole and found that @JsonInclude(JsonInclude.Include.NON_EMPTY) maybe covers this more robustly?
There was a problem hiding this comment.
oh, since it just serializes the array as the value 👍
| import javax.annotation.Nullable; | ||
| import java.util.List; | ||
|
|
||
| public class ReindexingDataSchemaRule extends AbstractReindexingRule |
There was a problem hiding this comment.
may not matter, but this is missing equals/hashcode/tostring (provider uses them in its implementation of these methods, other rules missing these methods too)
also worth a small javadoc
There was a problem hiding this comment.
is there a case for having a rule that should always apply? (e.g. i would think period of 0?)
There was a problem hiding this comment.
ya. I think I accidentally pulled it from the description that period of 0 and handling of future data need to be addressed. Perhaps this PR warrants supporting P0D. but future data should be left for follow on if needed
| @JsonProperty("id") @Nonnull String id, | ||
| @JsonProperty("description") @Nullable String description, | ||
| @JsonProperty("olderThan") @Nonnull Period olderThan, | ||
| @JsonProperty("segmentGranularity") @Nonnull Granularity segmentGranularity |
There was a problem hiding this comment.
granularity allows a lot of weird stuff, should we really allow all kinds of granularities here?
There was a problem hiding this comment.
I guess I am not sure? I was basing off of what the old compaction granularity allowed and that was just Granularity there
There was a problem hiding this comment.
per your comment below I am trending towards just laying the hammer and having a very narrow set of allowed values here to prevent people from causing themselves harm
| */ | ||
| enum AppliesToMode | ||
| { | ||
| PARTIAL, |
There was a problem hiding this comment.
partial seems underutilized since afaict we only pick rules for FULL, what is the plan for it?
There was a problem hiding this comment.
good question. I was under the impression when I initially was writing this that it may be useful, but from where I am now after dealing with trying to make all of this possible to reason about as an operator, I think a rule provider trying to manage partial overlaps somehow instead of forcing full coverage, would be a nightmare. so maybe I should delete?
There was a problem hiding this comment.
i'm not completely sure, it seems semi useful to track, if for nothing else from a debugging standpoint in case it is useful later
| DateTime rawEnd = referenceTime.minus(rule.getOlderThan()); | ||
| DateTime alignedEnd = rule.getSegmentGranularity().bucketStart(rawEnd); | ||
| DateTime alignedStart = (previousAlignedEnd != null) ? previousAlignedEnd : DateTimes.MIN; |
There was a problem hiding this comment.
you can get some pretty weird stuff going on here, perhaps we just encourage people not to create unhinged combinations that don't make sense?
LIke, I guess I wonder if we should take an opinionated stance and add some additional validation to make sure that the granularity is itself also like aligned with the interval?
Like I was curious what happens with a total freedom here, so for example imagine i have gone mad and define 3 segment granularity reindexing rules:
- 'hour-rule' - olderThan p1d, hourly granularity
- '3day-rule' - olderThan p2d, period granularity of p3d
- '5day-rule' - olderThan p3d, period granularity of p5d
Using the 'reference time' of the tests, this resolves into something like
IntervalGranularityInfo{interval=-146136543-09-08T08:23:32.096Z/2025-01-22T00:00:00.000Z, granularity={type=period, period=P5D, timeZone=UTC, origin=null}},
IntervalGranularityInfo{interval=2025-01-22T00:00:00.000Z/2025-01-27T00:00:00.000Z, granularity={type=period, period=P3D, timeZone=UTC, origin=null}},
IntervalGranularityInfo{interval=2025-01-27T00:00:00.000Z/2025-01-28T16:00:00.000Z, granularity={type=period, period=PT1H, timeZone=UTC, origin=null}}
seems strange because there is a 5 day interval of 3 day segments, what does that mean? 🤷
I didn't even try adding timezones on period granularities or using like, duration granularity, but I suspect they can make stuff even weirder.
Is this ok? (i think the answer is maybe not, since i tried running a weird configuration like this on CompactionSupervisorTest.test_cascadingCompactionTemplate_multiplePeriodsApplyDifferentCompactionRules and MSQ got stuck in some weird state if i specified a timezone.. i didn't dig very deep into it yet)
There was a problem hiding this comment.
I almost want to say we take a hard stance and limit segment granularity rules to something like FIFTEEN_MINUTE, THIRTY_MINUTE, HOUR, DAY, MONTH, YEAR. at this point. The management of this timeline is proving to a real pain
There was a problem hiding this comment.
i think it would be fine to start out super restrictive, we can always revisit this part later, probably easier to do that than start out super permissive and try to rein it in later
|
thanks again for review round @clintropolis I think a lot makes sense and I've got most the changes locally. The segment granularity thing I'm at the point where it is so ripe for edge cases and errors where an operator may get themselves in trouble and regret it that I think the static set of standard granularities I called out in the comment is the way. my take is that it still gives people a lot more flexibility than operators have today, while trying to make our lives easier coding it, and prevent them (operators) from doing something crazy that they hate. what do you think? |
follow up to #18844 ... at least in terms of the quest to begin the transition from the term compaction to reindexing. More info can be found in that PR description about the naming change and new centralized indexing state storage that the supervisor uses to determine if segments need to be reindexed (replacement for lastCompactionState stored per segment). In this PR I will use the term reindexing whenever possible. When the term compaction is used it will only be to refer to an actual Java class that is yet to be refactored.
Description
Extend reindexing supervisors (AKA compaction supervisors) to allow for a single Druid data source to apply different reindexing configurations to different segments depending on what "reindexing rules" are defined for the data source that apply to the time interval of the segment being reindexed.
Value Proposition
Timeseries data often provides value in different ways over time. Data for the last 7 days is often interacted with differently than data for the last 30 days and once again for data from some number of years ago. In Druid, we should give data owners the ability to use reindexing supervisors (AKA compaction supervisors) to change the state of a single datasource as the data ages. Operators should have the ability to define a data lifecycle of sorts for their datasources and allow reindexing supervisors to dynamically apply that definition to the underlying segments as they age. A great, and simple, example is query granularity.
Design
CompactionConfigBasedJobTemplate underpins all of this
The existing
CompactionConfigBasedJobTemplateunderpins all of the new functionality. At the end of the day, the cascading reindexing template uses per-datasourceReindexingRules that are supplied by aReindexingRuleProviderto create multiple non-overlapping search intervals that each apply a distinctInlineSchemaDataSourceCompactionConfig.ReindexingRuleA good place to start review is to take a quick look at the
ReindexingRuleconcept and the different implementations. The basic idea is that the components of the existingCompactionStateconcept have been broken out into multiple rule types:ReindexingDataSchemaRuleReindexingSegmentGranularityRuleReindexingTuningConfigRuleReindexingIOConfigRuleReindexingDeletionRuleCompactionState#TransformSpecmaps to what we callReindexingDeletionRulewhich is aDimFilterunder the hood, and all rows matching are deleted during reindexing.Rule Anatomy
The key concept to take away is that
olderThanreads as: "Apply this rule to Druid segments who have intervals ending at or beforenow - olderThan"Rule selection when
> 1rules of the same type exist for a datasourceThe whole point of this PR is so a datasource can have different reindexing configuration at different points in its timeline. That means there will inevitably multiple rules of the same type that exist for a dataSource. For all but one rule type, only a single rule can be selected per search interval and its associated
InlineSchemaDataSourceCompactionConfig. When this happens, it is theReindexingRuleProviderimplementation that makes the selection of which rule to pick for an interval. In this PR, the only concrete implementation isInlineReindexingRuleProviderand the logic for selection is simple:####### Special Case:
ReindexingDeletionRuleThe
ReindexingDeletionRuleis the only rule type that is additive. All matching rules for a search interval become part of theInlineSchemaDataSourceCompactionConfigfor that interval. The take the form of:NOT(A OR B OR C OR D)where A, B, C, ... are individual
ReindexingDeletionRules########
ReindexingDeletionRuleoptimizationWe avoid applying
ReindexingDeletionRules toCompactionCandidates whose segments have all applied the rule already. This rule pruning is done using an optimizer before the underlingCompactionConfigBasedJobTemplatecreates theCompactionTasks. This is done to avoid wasting work during re-indexing.ReindexingRuleProviderThe next logical place to go from here in review is the rule provider itself. This is meant to be extensible, with this PR providing one concrete implementation,
InlineReindexingRuleProvider, that defines all of the rules within the supervisor spec itself.Classes to check out are:
ReindexingRuleProviderandInlineReindexingRuleProviderA note on
ComposingReindexingRuleProvider: This guy is a little funny in this PR as it is meant to give you power once there are multipleReindexingRuleProviderimplementations in Druid. You can chain them together, with their ordering mattering in determining what provider provides what rules.InlineReindexingRuleProviderrule applicationA rule applies to an interval IF the
reference time - olderThan periodfor the rule is explicitly AFTER the end time of that interval. This means that rules only apply to intervals that are fully before their threshold.CascadingReindexingTemplateThis is the brains of the operation when it comes to how the supervisor takes a collection of rules that exist for a datasource and turns them into
CompactionTasks that can be run.Non-Overlapping Timeline Generation
The most interesting thing about this is generating the non-overlapping timeline of
CompactionConfigBasedJobTemplateobjects where the non-overlapping terminology pertains to the search intervals that each template will cover.Target SegmentGranularity Requirement
It is mandatory that every
CompactionConfigBasedJobTemplatehas a targetSegmentGranularitydefined in its config. This target granularity comes from either an explicitReindexingSegmentGranularityRulethat applies to the search interval or the default segment granularity that thecascadeReindexmust provide in the spec to ensure that even if no rules exist, the default will stand in to fulfill this requirement.Search interval boundary adjustment to match target segment granularity
I will use an example to show this in action as I think it is easier than words.
2024-02-04T22:12:04.873ZolderThan=P7D segmentGranularity=DAYolderThan=P1M segmentGranularity=MONTHolderThan=P1Y segmentGranularity=YEARTimeline with no adjustments to align search intervals with target granularity:
Timeline with our adjustments to align search intervals with target granularity:
A side effect of this is that rules will technically be delayed in their being applied, but it will ensure our search intervals align nicely with the segment granularity that they are targeting.
Two unfortunate constraints:
Splitting the timeline built over the Granularity rules to accommodate the non-segment gran rules
We still have tons of rule types beyond these segment granularity rules that we fussed over to create our initial timeline. The
reference_time - olderThanthresholds for each of these additional rules will fall within an existing interval in our timeline (except for one case we will cover at the end of this section). For each of these rules, we first use the segment granularity of the interval they fall in to granularity align their threshold timeGranularity.bucketStart(threshold). If this adjusted threshold still falls within the interval, we need to split the interval at this time to allow precise application of the rule. We repeat this with all of the distinctolderThanperiods that exist for our non segment granularity rules.Edge cases
olderThanperiod for a segment granularity rule is larger than the smallest period for non segment granularity rulesolderThanperiod to create a synthetic interval to make sure all of our search intervals have a segment granularity.Example Inline Rule Provider Spec
{ "type": "autocompact", "spec": { "type": "reindexCascade", "dataSource": "wikipedia", "defaultSegmentGranularity": "HOUR", "taskPriority": 25, "inputSegmentSizeBytes": 500000000, "skipOffsetFromLatest": "P1D", "engine": "msq", "taskContext": { "maxNumTasks": 10 }, "ruleProvider": { "type": "inline", "segmentGranularityRules": [ { "id": "recent-fine-grain", "olderThan": "P1D", "segmentGranularity": "HOUR" }, { "id": "week-old-medium-grain", "olderThan": "P7D", "segmentGranularity": "DAY" }, { "id": "old-coarse-grain", "olderThan": "P30D", "segmentGranularity": "MONTH" } ], "deletionRules": [ { "id": "remove-bots", "description": "Remove robot traffic from old data", "olderThan": "P30D", "deleteWhere": { "type": "equals", "column": "is_bot", "matchValueType": "STRING", "matchValue": "true" } }, { "id": "remove-test-users", "description": "Remove test users from very old data", "olderThan": "P1Y", "deleteWhere": { "type": "equals", "column": "userType", "matchValueType": "STRING", "matchValue": "test" } } ], "tuningConfigRules": [ { "id": "partition-by-country", "description": "Partition data by country for better query performance for common use cases", "olderThan": "P1D", "tuningConfig": { "partitionsSpec": { "type": "range", "maxRowsPerSegment": 10000000, "partitionDimensions": [ "countryName" ] } } } ], "dataSchemaRules": [ { "id": "base-data-schema-rule", "olderThan": "P1D", "dimensionsSpec": { "dimensions": [ "channel", "cityName", "comment", "countryIsoCode", "countryName", "namespace", "page", "regionIsoCode", "regionName", "user", "isAnonymous", "isMinor", "isNew", "isRobot", "isUnpatrolled", "metroCode" ] }, "metricsSpec": [ { "type": "longSum", "name": "added", "fieldName": "added" }, { "type": "longSum", "name": "deleted", "fieldName": "deleted" }, { "type": "longSum", "name": "delta", "fieldName": "delta" } ], "queryGranularity": "MINUTE", "rollup": true } ] } }, "suspended": true }This spec will:
Future Work
documentation
I think this should come with the console addition PR since that will be when this really becomes realistic to operate without getting too confused with what is going on under the hood for a config.
Console (Future Addition)
Edit: After discussions we decided this is better fit for a follow up PR. I will leave this section as preview for now. but the console changes will actually come in 2nd PR
Based of review with others, adding visualizations of the reindexing timeline in the console seemed like a nice to have for operators. And after experimenting with adding it, I think it is actually a critical piece for sane operations. Using an AI coding agent (I'm know nothing about front end dev), I iterated on console changes and came up with a proposal for a console view of the reindexing timeline so an operator can see how the underlying rules break up into search intervals and what configuration will be applied to each search interval. Below are some pictures using the example inline supervisor above
catalog provider?
Would be cool to tie this into the catalog definitions of tables
Release note
Key changed/added classes in this PR
CascadingReindexingTemplateReindexingConfigOptimizerReindexingDeletionRuleOptimizerReindexingRuleProviderInlineReindexingRuleProviderComposingReindexingRuleProviderReindexingRule+AbstractReindexingRuleReindexingDimensionsRuleReindexingMetricsRuleReindexingSegmentGranularityRuleReindexingQueryGranularityRuleReindexingIOConfigRuleReindexingProjectionRuleReindexingDeletionRuleThis PR has: