-
Notifications
You must be signed in to change notification settings - Fork 683
chore: remove span-metrics leftovers and lazy-init generator clients #6618
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
bb6c664
5f11713
254582a
30d1c2f
afc3f68
97244ca
8390d81
552bd48
a0f5a2c
f34eb0b
b1f434a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -64,7 +64,6 @@ | |
| CacheProvider string = "cache-provider" | ||
|
|
||
| // rings | ||
| MetricsGeneratorRing string = "metrics-generator-ring" | ||
| LiveStoreRing string = "live-store-ring" | ||
| PartitionRing string = "partition-ring" | ||
| GeneratorRingWatcher string = "generator-ring-watcher" | ||
|
|
@@ -84,8 +83,7 @@ | |
| SingleBinary string = "all" | ||
|
|
||
| // ring names | ||
| ringMetricsGenerator string = "metrics-generator" | ||
| ringLiveStore string = "live-store" | ||
| ringLiveStore string = "live-store" | ||
| ) | ||
|
|
||
| func IsSingleBinary(target string) bool { | ||
|
|
@@ -154,10 +152,6 @@ | |
| return s, nil | ||
| } | ||
|
|
||
| func (t *App) initGeneratorRing() (services.Service, error) { | ||
| return t.initReadRing(t.cfg.Generator.Ring.ToRingConfig(), ringMetricsGenerator, t.cfg.Generator.OverrideRingKey) | ||
| } | ||
|
|
||
| func (t *App) initLiveStoreRing() (services.Service, error) { | ||
| return t.initReadRing(t.cfg.LiveStore.Ring.ToRingConfig(), ringLiveStore, ringLiveStore) | ||
| } | ||
|
|
@@ -248,16 +242,26 @@ | |
| } | ||
|
|
||
| func (t *App) initDistributor() (services.Service, error) { | ||
| singleBinary := IsSingleBinary(t.cfg.Target) | ||
|
|
||
| t.cfg.Distributor.KafkaConfig = t.cfg.Ingest.Kafka | ||
| t.cfg.Distributor.IngesterWritePathEnabled = false | ||
| t.cfg.Distributor.KafkaWritePathEnabled = t.cfg.Ingest.Enabled // TODO: Don't mix config params | ||
| t.cfg.Distributor.PushSpansToKafka = true | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Useful? React with 👍 / 👎.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Kafka is the only deployment model now unless we are in singlebinary mode. So this is totally right |
||
|
|
||
javiermolinar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| var pushSpansToLocalGenerator distributor.PushSpansFunc | ||
| if singleBinary { | ||
| pushSpansToLocalGenerator = func(ctx context.Context, req *tempopb.PushSpansRequest) (*tempopb.PushResponse, error) { | ||
| if t.generator == nil { | ||
| return nil, errors.New("metrics-generator not initialized") | ||
| } | ||
| return t.generator.PushSpans(ctx, req) | ||
| } | ||
| } | ||
|
Comment on lines
+250
to
+258
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thinking about more kafkaless changes, it'd be nice to instead have a call in the distributor I'm fine if you prefer it this way.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good idea
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do you mind if we do it in a different pr? this is wired with the middleware and the shim and it involves some other changes |
||
|
|
||
| // todo: make write-path client a module instead of passing the config everywhere | ||
| distributor, err := distributor.New(t.cfg.Distributor, | ||
| t.cfg.IngesterClient, | ||
| t.readRings[ringLiveStore], | ||
| t.cfg.GeneratorClient, | ||
| t.readRings[ringMetricsGenerator], | ||
| pushSpansToLocalGenerator, | ||
| t.partitionRing, | ||
| t.Overrides, | ||
| t.TracesConsumerMiddleware, | ||
|
|
@@ -279,7 +283,7 @@ | |
| } | ||
|
|
||
| func (t *App) initGenerator() (services.Service, error) { | ||
| t.cfg.Generator.Ring.ListenPort = t.cfg.Server.GRPCListenPort | ||
| t.cfg.Generator.ConsumeFromKafka = !IsSingleBinary(t.cfg.Target) | ||
|
|
||
| t.cfg.Generator.Ingest = t.cfg.Ingest | ||
| t.cfg.Generator.Ingest.Kafka.ConsumerGroup = generator.ConsumerGroup | ||
|
|
@@ -294,32 +298,14 @@ | |
| } | ||
| t.generator = genSvc | ||
|
|
||
| spanStatsHandler := t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.generator.SpanMetricsHandler)) | ||
| t.Server.HTTPRouter().Handle(path.Join(api.PathPrefixGenerator, addHTTPAPIPrefix(&t.cfg, api.PathSpanMetrics)), spanStatsHandler) | ||
|
|
||
| queryRangeHandler := t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.generator.QueryRangeHandler)) | ||
| t.Server.HTTPRouter().Handle(path.Join(api.PathPrefixGenerator, addHTTPAPIPrefix(&t.cfg, api.PathMetricsQueryRange)), queryRangeHandler) | ||
|
|
||
| if !IsSingleBinary(t.cfg.Target) { | ||
| tempopb.RegisterMetricsGeneratorServer(t.Server.GRPC(), t.generator) // todo: this can be removed before 3.0 but needs to exist as long as we have any deployments anywhere on the traditional arch | ||
| } | ||
|
|
||
| return t.generator, nil | ||
| } | ||
|
|
||
| func (t *App) initGeneratorNoLocalBlocks() (services.Service, error) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe there is now no functional difference between the two can could be consolidated.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, this is something I want to tackle on a different PR. We need first to allow the initGenerator to use a different ring and differect codecs for at least one release since this target (initGeneratorNoLocalBlocks) is already in use |
||
| reg := prometheus.DefaultRegisterer | ||
|
|
||
| t.cfg.Generator.Ingest = t.cfg.Ingest | ||
|
|
||
| // In this mode, the generator runs as a stateless queue consumer that reads from | ||
| // Kafka and remote writes to a Prometheus-compatible metrics store. | ||
| if !t.cfg.Ingest.Enabled { | ||
| return nil, errors.New("ingest storage must be enabled to run metrics generator in this mode") | ||
| } | ||
| // In this mode, the generator does not need to become available to serve | ||
| // queries, so we can skip setting up a gRPC server. | ||
| t.cfg.Generator.DisableGRPC = true | ||
| t.cfg.Generator.ConsumeFromKafka = true | ||
|
|
||
| var err error | ||
| t.generator, err = generator.New(&t.cfg.Generator, t.Overrides, reg, t.generatorRingWatcher, log.Logger) | ||
|
|
@@ -356,10 +342,6 @@ | |
| } | ||
|
|
||
| func (t *App) initBlockBuilder() (services.Service, error) { | ||
| if !t.cfg.Ingest.Enabled { | ||
| return services.NewIdleService(nil, nil), nil | ||
| } | ||
|
|
||
| t.cfg.BlockBuilder.IngestStorageConfig = t.cfg.Ingest | ||
| t.cfg.BlockBuilder.IngestStorageConfig.Kafka.ConsumerGroup = blockbuilder.ConsumerGroup | ||
| t.cfg.BlockBuilder.GlobalBlockConfig = t.cfg.StorageConfig.Trace.Block | ||
|
|
@@ -675,10 +657,6 @@ | |
| } | ||
|
|
||
| func (t *App) initLiveStore() (services.Service, error) { | ||
| if !t.cfg.Ingest.Enabled { | ||
| return services.NewIdleService(nil, nil), nil | ||
| } | ||
|
|
||
| // In SingleBinary mode don't try to discover partition from host name. | ||
| // Always use partition 0. This is for small installs or local/debugging setups. | ||
| singlePartition := IsSingleBinary(t.cfg.Target) | ||
|
|
@@ -721,7 +699,6 @@ | |
| mm.RegisterModule(OverridesAPI, t.initOverridesAPI) | ||
| mm.RegisterModule(UsageReport, t.initUsageReport) | ||
| mm.RegisterModule(CacheProvider, t.initCacheProvider, modules.UserInvisibleModule) | ||
| mm.RegisterModule(MetricsGeneratorRing, t.initGeneratorRing, modules.UserInvisibleModule) | ||
| mm.RegisterModule(GeneratorRingWatcher, t.initGeneratorRingWatcher, modules.UserInvisibleModule) | ||
| mm.RegisterModule(LiveStoreRing, t.initLiveStoreRing, modules.UserInvisibleModule) | ||
| mm.RegisterModule(PartitionRing, t.initPartitionRing, modules.UserInvisibleModule) | ||
|
|
@@ -749,7 +726,6 @@ | |
| OverridesAPI: {Server, Overrides}, | ||
| MemberlistKV: {Server}, | ||
| UsageReport: {MemberlistKV}, | ||
| MetricsGeneratorRing: {Server, MemberlistKV}, | ||
| LiveStoreRing: {Server, MemberlistKV}, | ||
| PartitionRing: {MemberlistKV, Server, LiveStoreRing}, | ||
| GeneratorRingWatcher: {MemberlistKV}, | ||
|
|
@@ -758,7 +734,7 @@ | |
|
|
||
| // individual targets | ||
| QueryFrontend: {Common, Store, OverridesAPI}, | ||
| Distributor: {Common, LiveStoreRing, MetricsGeneratorRing, PartitionRing}, | ||
| Distributor: {Common, LiveStoreRing, PartitionRing}, | ||
| MetricsGenerator: {Common, MemberlistKV, PartitionRing}, | ||
| MetricsGeneratorNoLocalBlocks: {Common, GeneratorRingWatcher}, | ||
| Querier: {Common, Store, LiveStoreRing, PartitionRing}, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -42,10 +42,14 @@ type Config struct { | |
| Forwarders forwarder.ConfigList `yaml:"forwarders"` | ||
| Usage usage.Config `yaml:"usage,omitempty"` | ||
|
|
||
| // Migration to Kafka write path | ||
| IngesterWritePathEnabled bool `yaml:"ingester_write_path_enabled"` | ||
| KafkaWritePathEnabled bool `yaml:"kafka_write_path_enabled"` | ||
| KafkaConfig ingest.KafkaConfig `yaml:"kafka_config"` | ||
| // Deprecated: this field will be removed in a future release. Write path routing is set by deployment model. | ||
| IngesterWritePathEnabled bool `yaml:"ingester_write_path_enabled"` | ||
| // Deprecated: this field will be removed in a future release. Write path routing is set by deployment model. | ||
| KafkaWritePathEnabled bool `yaml:"kafka_write_path_enabled"` | ||
| KafkaConfig ingest.KafkaConfig `yaml:"kafka_config"` | ||
|
|
||
| // Internal routing toggle set by app wiring (not user-configurable). | ||
| PushSpansToKafka bool `yaml:"-"` | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't this always true? Why have this param?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a placeholder. When we make the kafkaless mode this will be set to false instructing the distributor to not push to kafka |
||
|
|
||
| // disables write extension with inactive ingesters. Use this along with ingester.lifecycler.unregister_on_shutdown = true | ||
| // note that setting these two config values reduces tolerance to failures on rollout b/c there is always one guaranteed to be failing replica | ||
|
|
@@ -104,7 +108,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) | |
| } | ||
|
|
||
| func (cfg *Config) Validate() error { | ||
| if cfg.KafkaWritePathEnabled { | ||
| if cfg.PushSpansToKafka { | ||
| if err := cfg.KafkaConfig.Validate(); err != nil { | ||
| return err | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now setting up Kafka is mandatory. If we dont set it and we dont set target we get a validation error:
Until we make the whole target --all kafkaless we need to use a non kafka module. It can be the backendscheduler or any other one