This repository was archived by the owner on Aug 24, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 20
PMM-9632 Settings view usage. #374
Open
JiriCtvrtka
wants to merge
17
commits into
main
Choose a base branch
from
PMM-9632-settings-view
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
17 commits
Select commit
Hold shift + click to select a range
e1226b1
PMM-9632 Struct draft.
564c1c7
PMM-9632 Changes.
630d0dd
PMM-9632 Changes in models, methods, etc.
5b380a1
PMM-9632 Pointers to possible nil values.
a764a32
PMM-9632 Gen.
3f7f08e
PMM-9632 Changes.
faec4ce
Merge branch 'main' into PMM-9632-settings-view
cc7b296
PMM-9632 Another changes.
ed93bb9
PMM-9632 Lint.
274fbb4
Merge branch 'main' into PMM-9632-settings-view
JiriCtvrtka 8054441
PMM-9632 Lint.
d524caa
PMM-9632 Revert settings into QAN feature.
ecc37e7
PMM-9632 Required changes.
6bac794
PMM-9632 Remove empty line.
3e8c192
PMM-9632 Fix tests.
86d99a2
PMM-9632 Fix bug for older versions.
4912906
Merge branch 'main' into PMM-9632-settings-view
JiriCtvrtka File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,8 +26,7 @@ import ( | |
|
|
||
| "github.com/AlekSi/pointer" | ||
| ver "github.com/hashicorp/go-version" | ||
| "github.com/lib/pq" //nolint:gci | ||
| _ "github.com/lib/pq" // register SQL driver. | ||
| "github.com/lib/pq" | ||
| "github.com/percona/pmm/api/agentpb" | ||
| "github.com/percona/pmm/api/inventorypb" | ||
| "github.com/percona/pmm/utils/sqlmetrics" | ||
|
|
@@ -45,20 +44,12 @@ const defaultWaitTime = 60 * time.Second | |
|
|
||
| // PGStatMonitorQAN QAN services connects to PostgreSQL and extracts stats. | ||
| type PGStatMonitorQAN struct { | ||
| q *reform.Querier | ||
| dbCloser io.Closer | ||
| agentID string | ||
| l *logrus.Entry | ||
| changes chan agents.Change | ||
| monitorCache *statMonitorCache | ||
|
|
||
| // By default, query shows the actual parameter instead of the placeholder. | ||
| // It is quite useful when users want to use that query and try to run that | ||
| // query to check the abnormalities. But in most cases users like the queries | ||
| // with a placeholder. This parameter is used to toggle between the two said | ||
| // options. | ||
| pgsmNormalizedQuery bool | ||
| waitTime time.Duration | ||
| q *reform.Querier | ||
| dbCloser io.Closer | ||
| agentID string | ||
| l *logrus.Entry | ||
| changes chan agents.Change | ||
| monitorCache *statMonitorCache | ||
| disableQueryExamples bool | ||
| } | ||
|
|
||
|
|
@@ -92,7 +83,7 @@ const ( | |
| commandTypeUpdate = "UPDATE" | ||
| commandTypeInsert = "INSERT" | ||
| commandTypeDelete = "DELETE" | ||
| commandTypeUtiity = "UTILITY" | ||
| commandTypeUtility = "UTILITY" | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Already fixed in previous PR, but its not merged yet. |
||
| ) | ||
|
|
||
| var commandTypeToText = []string{ | ||
|
|
@@ -101,7 +92,7 @@ var commandTypeToText = []string{ | |
| commandTypeUpdate, | ||
| commandTypeInsert, | ||
| commandTypeDelete, | ||
| commandTypeUtiity, | ||
| commandTypeUtility, | ||
| commandTextNotAvailable, | ||
| } | ||
|
|
||
|
|
@@ -122,25 +113,6 @@ func New(params *Params, l *logrus.Entry) (*PGStatMonitorQAN, error) { | |
| return newPgStatMonitorQAN(q, sqlDB, params.AgentID, params.DisableQueryExamples, l) | ||
| } | ||
|
|
||
| func isPropertyValueInt(property string) bool { | ||
| switch property { | ||
| case | ||
| "pg_stat_monitor.pgsm_histogram_max", | ||
| "pg_stat_monitor.pgsm_query_max_len", | ||
| "pg_stat_monitor.pgsm_max", | ||
| "pg_stat_monitor.pgsm_bucket_time", | ||
| "pg_stat_monitor.pgsm_query_shared_buffer", | ||
| "pg_stat_monitor.pgsm_max_buckets", | ||
| "pg_stat_monitor.pgsm_histogram_buckets", | ||
| "pg_stat_monitor.pgsm_overflow_target", | ||
| "pg_stat_monitor.pgsm_histogram_min": | ||
|
|
||
| return true | ||
| } | ||
|
|
||
| return false | ||
| } | ||
|
|
||
| func areSettingsTextValues(q *reform.Querier) (bool, error) { | ||
| pgsmVersion, prerelease, err := getPGMonitorVersion(q) | ||
| if err != nil { | ||
|
|
@@ -155,66 +127,13 @@ func areSettingsTextValues(q *reform.Querier) (bool, error) { | |
| } | ||
|
|
||
| func newPgStatMonitorQAN(q *reform.Querier, dbCloser io.Closer, agentID string, disableQueryExamples bool, l *logrus.Entry) (*PGStatMonitorQAN, error) { | ||
| var settings []reform.Struct | ||
|
|
||
| settingsValuesAreText, err := areSettingsTextValues(q) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| if settingsValuesAreText { | ||
| settings, err = q.SelectAllFrom(pgStatMonitorSettingsTextValueView, "") | ||
| } else { | ||
| settings, err = q.SelectAllFrom(pgStatMonitorSettingsView, "") | ||
| } | ||
| if err != nil { | ||
| return nil, errors.Wrap(err, "failed to get settings") | ||
| } | ||
|
|
||
| var normalizedQuery bool | ||
| waitTime := defaultWaitTime | ||
| for _, row := range settings { | ||
| var name string | ||
| var value int64 | ||
|
|
||
| if settingsValuesAreText { | ||
| setting := row.(*pgStatMonitorSettingsTextValue) | ||
| name = setting.Name | ||
| if !isPropertyValueInt(name) { | ||
| continue | ||
| } | ||
|
|
||
| valueInt, err := strconv.ParseInt(setting.Value, 10, 64) | ||
| if err != nil { | ||
| return nil, errors.Wrap(err, "value cannot be parsed as integer") | ||
| } | ||
| value = valueInt | ||
| } else { | ||
| setting := row.(*pgStatMonitorSettings) | ||
| name = setting.Name | ||
| value = setting.Value | ||
| } | ||
|
|
||
| if err == nil { | ||
| switch name { | ||
| case "pg_stat_monitor.pgsm_normalized_query": | ||
| normalizedQuery = value == 1 | ||
| case "pg_stat_monitor.pgsm_bucket_time": | ||
| if value < int64(defaultWaitTime.Seconds()) { | ||
| waitTime = time.Duration(value) * time.Second | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| return &PGStatMonitorQAN{ | ||
| q: q, | ||
| dbCloser: dbCloser, | ||
| agentID: agentID, | ||
| l: l, | ||
| changes: make(chan agents.Change, 10), | ||
| monitorCache: newStatMonitorCache(l), | ||
| pgsmNormalizedQuery: normalizedQuery, | ||
| waitTime: waitTime, | ||
| disableQueryExamples: disableQueryExamples, | ||
| }, nil | ||
| } | ||
|
|
@@ -276,10 +195,21 @@ func (m *PGStatMonitorQAN) Run(ctx context.Context) { | |
| close(m.changes) | ||
| }() | ||
|
|
||
| settings, err := m.getSettings() | ||
| if err != nil { | ||
| m.l.Error(err) | ||
| m.changes <- agents.Change{Status: inventorypb.AgentStatus_WAITING} | ||
| } | ||
| normalizedQuery, err := settings.getNormalizedQueryValue() | ||
| if err != nil { | ||
| m.l.Error(err) | ||
| m.changes <- agents.Change{Status: inventorypb.AgentStatus_WAITING} | ||
| } | ||
|
|
||
| // add current stat monitor to cache so they are not send as new on first iteration with incorrect timestamps | ||
| var running bool | ||
| m.changes <- agents.Change{Status: inventorypb.AgentStatus_STARTING} | ||
| if current, _, err := m.monitorCache.getStatMonitorExtended(ctx, m.q, m.pgsmNormalizedQuery); err == nil { | ||
| if current, _, err := m.monitorCache.getStatMonitorExtended(ctx, m.q, normalizedQuery); err == nil { | ||
| m.monitorCache.refresh(current) | ||
| m.l.Debugf("Got %d initial stat monitor.", len(current)) | ||
| running = true | ||
|
|
@@ -289,10 +219,15 @@ func (m *PGStatMonitorQAN) Run(ctx context.Context) { | |
| m.changes <- agents.Change{Status: inventorypb.AgentStatus_WAITING} | ||
| } | ||
|
|
||
| waitTime, err := settings.getWaitTime() | ||
| if err != nil { | ||
| m.l.Warning(err) | ||
| } | ||
|
|
||
| // query pg_stat_monitor every waitTime seconds | ||
| start := time.Now() | ||
| m.l.Debugf("Scheduling next collection in %s at %s.", m.waitTime, start.Add(m.waitTime).Format("15:04:05")) | ||
| t := time.NewTimer(m.waitTime) | ||
| m.l.Debugf("Scheduling next collection in %s at %s.", waitTime, start.Add(waitTime).Format("15:04:05")) | ||
| t := time.NewTimer(waitTime) | ||
| defer t.Stop() | ||
|
|
||
| for { | ||
|
|
@@ -307,12 +242,27 @@ func (m *PGStatMonitorQAN) Run(ctx context.Context) { | |
| m.changes <- agents.Change{Status: inventorypb.AgentStatus_STARTING} | ||
| } | ||
|
|
||
| lengthS := uint32(m.waitTime.Seconds()) | ||
| buckets, err := m.getNewBuckets(ctx, lengthS) | ||
| settings, err := m.getSettings() | ||
| if err != nil { | ||
| m.l.Errorf(err.Error()) | ||
| running = false | ||
| m.changes <- agents.Change{Status: inventorypb.AgentStatus_WAITING} | ||
| continue | ||
| } | ||
| normalizedQuery, err := settings.getNormalizedQueryValue() | ||
| if err != nil { | ||
| m.l.Errorf(err.Error()) | ||
| running = false | ||
| m.changes <- agents.Change{Status: inventorypb.AgentStatus_WAITING} | ||
| continue | ||
| } | ||
|
|
||
| lengthS := uint32(waitTime.Seconds()) | ||
| buckets, err := m.getNewBuckets(ctx, lengthS, normalizedQuery) | ||
|
|
||
| start = time.Now() | ||
| m.l.Debugf("Scheduling next collection in %s at %s.", m.waitTime, start.Add(m.waitTime).Format("15:04:05")) | ||
| t.Reset(m.waitTime) | ||
| m.l.Debugf("Scheduling next collection in %s at %s.", waitTime, start.Add(waitTime).Format("15:04:05")) | ||
| t.Reset(waitTime) | ||
|
|
||
| if err != nil { | ||
| m.l.Error(errors.Wrap(err, "getNewBuckets failed")) | ||
|
|
@@ -331,8 +281,71 @@ func (m *PGStatMonitorQAN) Run(ctx context.Context) { | |
| } | ||
| } | ||
|
|
||
| func (m *PGStatMonitorQAN) getNewBuckets(ctx context.Context, periodLengthSecs uint32) ([]*agentpb.MetricsBucket, error) { | ||
| current, prev, err := m.monitorCache.getStatMonitorExtended(ctx, m.q, m.pgsmNormalizedQuery) | ||
| type settings map[string]*pgStatMonitorSettingsTextValue | ||
|
|
||
| func (m *PGStatMonitorQAN) getSettings() (settings, error) { | ||
| var settingsRows []reform.Struct | ||
|
|
||
| settingsValuesAreText, err := areSettingsTextValues(m.q) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| if settingsValuesAreText { | ||
| settingsRows, err = m.q.SelectAllFrom(pgStatMonitorSettingsTextValueView, "") | ||
| } else { | ||
| settingsRows, err = m.q.SelectAllFrom(pgStatMonitorSettingsView, "") | ||
| } | ||
| if err != nil { | ||
| return nil, errors.Wrap(err, "failed to get settings") | ||
| } | ||
|
|
||
| settings := make(settings) | ||
| for _, row := range settingsRows { | ||
| if settingsValuesAreText { | ||
| setting := row.(*pgStatMonitorSettingsTextValue) | ||
| settings[setting.Name] = setting | ||
| } else { | ||
| setting := row.(*pgStatMonitorSettings) | ||
| name := setting.Name | ||
| settings[name] = &pgStatMonitorSettingsTextValue{ | ||
| Name: name, | ||
| Value: fmt.Sprintf("%d", setting.Value), | ||
| } | ||
| } | ||
| } | ||
|
|
||
| return settings, nil | ||
| } | ||
|
|
||
| func (s settings) getNormalizedQueryValue() (bool, error) { | ||
| key := "pg_stat_monitor.pgsm_normalized_query" | ||
| if _, ok := s[key]; !ok { | ||
| return false, errors.New("failed to get pgsm_normalized_query property") | ||
| } | ||
|
|
||
| if s[key].Value == "yes" || s[key].Value == "1" { | ||
| return true, nil | ||
| } | ||
|
|
||
| return false, nil | ||
| } | ||
|
|
||
| func (s settings) getWaitTime() (time.Duration, error) { | ||
| key := "pg_stat_monitor.pgsm_bucket_time" | ||
| if _, ok := s[key]; !ok { | ||
| return defaultWaitTime, errors.New("failed to get pgsm_bucket_time, wait time set on 60 seconds") | ||
| } | ||
|
|
||
| valueInt, err := strconv.ParseInt(s[key].Value, 10, 64) | ||
| if err != nil { | ||
| return defaultWaitTime, errors.Wrap(err, "property pgsm_bucket_time cannot be parsed as integer, wait time set on 60 seconds") | ||
| } | ||
|
|
||
| return time.Duration(valueInt) * time.Second, nil | ||
| } | ||
|
|
||
| func (m *PGStatMonitorQAN) getNewBuckets(ctx context.Context, periodLengthSecs uint32, normalizedQuery bool) ([]*agentpb.MetricsBucket, error) { | ||
| current, prev, err := m.monitorCache.getStatMonitorExtended(ctx, m.q, normalizedQuery) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.