Skip to content
This repository was archived by the owner on Aug 24, 2022. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 112 additions & 99 deletions agents/postgres/pgstatmonitor/pgstatmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ import (

"github.com/AlekSi/pointer"
ver "github.com/hashicorp/go-version"
"github.com/lib/pq" //nolint:gci
_ "github.com/lib/pq" // register SQL driver.
"github.com/lib/pq"
"github.com/percona/pmm/api/agentpb"
"github.com/percona/pmm/api/inventorypb"
"github.com/percona/pmm/utils/sqlmetrics"
Expand All @@ -45,20 +44,12 @@ const defaultWaitTime = 60 * time.Second

// PGStatMonitorQAN QAN services connects to PostgreSQL and extracts stats.
type PGStatMonitorQAN struct {
q *reform.Querier
dbCloser io.Closer
agentID string
l *logrus.Entry
changes chan agents.Change
monitorCache *statMonitorCache

// By default, query shows the actual parameter instead of the placeholder.
// It is quite useful when users want to use that query and try to run that
// query to check the abnormalities. But in most cases users like the queries
// with a placeholder. This parameter is used to toggle between the two said
// options.
pgsmNormalizedQuery bool
waitTime time.Duration
q *reform.Querier
dbCloser io.Closer
agentID string
l *logrus.Entry
changes chan agents.Change
monitorCache *statMonitorCache
disableQueryExamples bool
}

Expand Down Expand Up @@ -92,7 +83,7 @@ const (
commandTypeUpdate = "UPDATE"
commandTypeInsert = "INSERT"
commandTypeDelete = "DELETE"
commandTypeUtiity = "UTILITY"
commandTypeUtility = "UTILITY"
Copy link
Copy Markdown
Contributor Author

@JiriCtvrtka JiriCtvrtka May 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already fixed in previous PR, but its not merged yet.

)

var commandTypeToText = []string{
Expand All @@ -101,7 +92,7 @@ var commandTypeToText = []string{
commandTypeUpdate,
commandTypeInsert,
commandTypeDelete,
commandTypeUtiity,
commandTypeUtility,
commandTextNotAvailable,
}

Expand All @@ -122,25 +113,6 @@ func New(params *Params, l *logrus.Entry) (*PGStatMonitorQAN, error) {
return newPgStatMonitorQAN(q, sqlDB, params.AgentID, params.DisableQueryExamples, l)
}

func isPropertyValueInt(property string) bool {
switch property {
case
"pg_stat_monitor.pgsm_histogram_max",
"pg_stat_monitor.pgsm_query_max_len",
"pg_stat_monitor.pgsm_max",
"pg_stat_monitor.pgsm_bucket_time",
"pg_stat_monitor.pgsm_query_shared_buffer",
"pg_stat_monitor.pgsm_max_buckets",
"pg_stat_monitor.pgsm_histogram_buckets",
"pg_stat_monitor.pgsm_overflow_target",
"pg_stat_monitor.pgsm_histogram_min":

return true
}

return false
}

func areSettingsTextValues(q *reform.Querier) (bool, error) {
pgsmVersion, prerelease, err := getPGMonitorVersion(q)
if err != nil {
Expand All @@ -155,66 +127,13 @@ func areSettingsTextValues(q *reform.Querier) (bool, error) {
}

func newPgStatMonitorQAN(q *reform.Querier, dbCloser io.Closer, agentID string, disableQueryExamples bool, l *logrus.Entry) (*PGStatMonitorQAN, error) {
var settings []reform.Struct

settingsValuesAreText, err := areSettingsTextValues(q)
if err != nil {
return nil, err
}
if settingsValuesAreText {
settings, err = q.SelectAllFrom(pgStatMonitorSettingsTextValueView, "")
} else {
settings, err = q.SelectAllFrom(pgStatMonitorSettingsView, "")
}
if err != nil {
return nil, errors.Wrap(err, "failed to get settings")
}

var normalizedQuery bool
waitTime := defaultWaitTime
for _, row := range settings {
var name string
var value int64

if settingsValuesAreText {
setting := row.(*pgStatMonitorSettingsTextValue)
name = setting.Name
if !isPropertyValueInt(name) {
continue
}

valueInt, err := strconv.ParseInt(setting.Value, 10, 64)
if err != nil {
return nil, errors.Wrap(err, "value cannot be parsed as integer")
}
value = valueInt
} else {
setting := row.(*pgStatMonitorSettings)
name = setting.Name
value = setting.Value
}

if err == nil {
switch name {
case "pg_stat_monitor.pgsm_normalized_query":
normalizedQuery = value == 1
case "pg_stat_monitor.pgsm_bucket_time":
if value < int64(defaultWaitTime.Seconds()) {
waitTime = time.Duration(value) * time.Second
}
}
}
}

return &PGStatMonitorQAN{
q: q,
dbCloser: dbCloser,
agentID: agentID,
l: l,
changes: make(chan agents.Change, 10),
monitorCache: newStatMonitorCache(l),
pgsmNormalizedQuery: normalizedQuery,
waitTime: waitTime,
disableQueryExamples: disableQueryExamples,
}, nil
}
Expand Down Expand Up @@ -276,10 +195,21 @@ func (m *PGStatMonitorQAN) Run(ctx context.Context) {
close(m.changes)
}()

settings, err := m.getSettings()
if err != nil {
m.l.Error(err)
m.changes <- agents.Change{Status: inventorypb.AgentStatus_WAITING}
}
normalizedQuery, err := settings.getNormalizedQueryValue()
if err != nil {
m.l.Error(err)
m.changes <- agents.Change{Status: inventorypb.AgentStatus_WAITING}
}

// add current stat monitor to cache so they are not send as new on first iteration with incorrect timestamps
var running bool
m.changes <- agents.Change{Status: inventorypb.AgentStatus_STARTING}
if current, _, err := m.monitorCache.getStatMonitorExtended(ctx, m.q, m.pgsmNormalizedQuery); err == nil {
if current, _, err := m.monitorCache.getStatMonitorExtended(ctx, m.q, normalizedQuery); err == nil {
m.monitorCache.refresh(current)
m.l.Debugf("Got %d initial stat monitor.", len(current))
running = true
Expand All @@ -289,10 +219,15 @@ func (m *PGStatMonitorQAN) Run(ctx context.Context) {
m.changes <- agents.Change{Status: inventorypb.AgentStatus_WAITING}
}

waitTime, err := settings.getWaitTime()
if err != nil {
m.l.Warning(err)
}

// query pg_stat_monitor every waitTime seconds
start := time.Now()
m.l.Debugf("Scheduling next collection in %s at %s.", m.waitTime, start.Add(m.waitTime).Format("15:04:05"))
t := time.NewTimer(m.waitTime)
m.l.Debugf("Scheduling next collection in %s at %s.", waitTime, start.Add(waitTime).Format("15:04:05"))
t := time.NewTimer(waitTime)
defer t.Stop()

for {
Expand All @@ -307,12 +242,27 @@ func (m *PGStatMonitorQAN) Run(ctx context.Context) {
m.changes <- agents.Change{Status: inventorypb.AgentStatus_STARTING}
}

lengthS := uint32(m.waitTime.Seconds())
buckets, err := m.getNewBuckets(ctx, lengthS)
settings, err := m.getSettings()
if err != nil {
m.l.Errorf(err.Error())
running = false
m.changes <- agents.Change{Status: inventorypb.AgentStatus_WAITING}
continue
}
normalizedQuery, err := settings.getNormalizedQueryValue()
if err != nil {
m.l.Errorf(err.Error())
running = false
m.changes <- agents.Change{Status: inventorypb.AgentStatus_WAITING}
continue
}

lengthS := uint32(waitTime.Seconds())
buckets, err := m.getNewBuckets(ctx, lengthS, normalizedQuery)

start = time.Now()
m.l.Debugf("Scheduling next collection in %s at %s.", m.waitTime, start.Add(m.waitTime).Format("15:04:05"))
t.Reset(m.waitTime)
m.l.Debugf("Scheduling next collection in %s at %s.", waitTime, start.Add(waitTime).Format("15:04:05"))
t.Reset(waitTime)

if err != nil {
m.l.Error(errors.Wrap(err, "getNewBuckets failed"))
Expand All @@ -331,8 +281,71 @@ func (m *PGStatMonitorQAN) Run(ctx context.Context) {
}
}

func (m *PGStatMonitorQAN) getNewBuckets(ctx context.Context, periodLengthSecs uint32) ([]*agentpb.MetricsBucket, error) {
current, prev, err := m.monitorCache.getStatMonitorExtended(ctx, m.q, m.pgsmNormalizedQuery)
type settings map[string]*pgStatMonitorSettingsTextValue

func (m *PGStatMonitorQAN) getSettings() (settings, error) {
var settingsRows []reform.Struct

settingsValuesAreText, err := areSettingsTextValues(m.q)
if err != nil {
return nil, err
}
if settingsValuesAreText {
settingsRows, err = m.q.SelectAllFrom(pgStatMonitorSettingsTextValueView, "")
} else {
settingsRows, err = m.q.SelectAllFrom(pgStatMonitorSettingsView, "")
}
if err != nil {
return nil, errors.Wrap(err, "failed to get settings")
}

settings := make(settings)
for _, row := range settingsRows {
if settingsValuesAreText {
setting := row.(*pgStatMonitorSettingsTextValue)
settings[setting.Name] = setting
} else {
setting := row.(*pgStatMonitorSettings)
name := setting.Name
settings[name] = &pgStatMonitorSettingsTextValue{
Name: name,
Value: fmt.Sprintf("%d", setting.Value),
}
}
}

return settings, nil
}

func (s settings) getNormalizedQueryValue() (bool, error) {
key := "pg_stat_monitor.pgsm_normalized_query"
if _, ok := s[key]; !ok {
return false, errors.New("failed to get pgsm_normalized_query property")
}

if s[key].Value == "yes" || s[key].Value == "1" {
return true, nil
}

return false, nil
}

func (s settings) getWaitTime() (time.Duration, error) {
key := "pg_stat_monitor.pgsm_bucket_time"
if _, ok := s[key]; !ok {
return defaultWaitTime, errors.New("failed to get pgsm_bucket_time, wait time set on 60 seconds")
}

valueInt, err := strconv.ParseInt(s[key].Value, 10, 64)
if err != nil {
return defaultWaitTime, errors.Wrap(err, "property pgsm_bucket_time cannot be parsed as integer, wait time set on 60 seconds")
}

return time.Duration(valueInt) * time.Second, nil
}

func (m *PGStatMonitorQAN) getNewBuckets(ctx context.Context, periodLengthSecs uint32, normalizedQuery bool) ([]*agentpb.MetricsBucket, error) {
current, prev, err := m.monitorCache.getStatMonitorExtended(ctx, m.q, normalizedQuery)
if err != nil {
return nil, err
}
Expand Down
28 changes: 22 additions & 6 deletions agents/postgres/pgstatmonitor/pgstatmonitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,12 @@ func TestPGStatMonitorSchema(t *testing.T) {
_, err := db.Exec(selectAllCountries)
require.NoError(t, err)

buckets, err := m.getNewBuckets(context.Background(), 60)
settings, err := m.getSettings()
require.NoError(t, err)
normalizedQuery, err := settings.getNormalizedQueryValue()
require.NoError(t, err)

buckets, err := m.getNewBuckets(context.Background(), 60, normalizedQuery)
require.NoError(t, err)
buckets = filter(buckets)
t.Logf("Actual:\n%s", tests.FormatBuckets(buckets))
Expand All @@ -213,7 +218,8 @@ func TestPGStatMonitorSchema(t *testing.T) {
assert.Equal(t, float32(5), actual.Postgresql.MSharedBlksHitSum+actual.Postgresql.MSharedBlksReadSum)
assert.InDelta(t, 1.5, actual.Postgresql.MSharedBlksHitCnt+actual.Postgresql.MSharedBlksReadCnt, 0.5)
example := ""
if !m.pgsmNormalizedQuery && !m.disableQueryExamples {

if !normalizedQuery && !m.disableQueryExamples {
example = actual.Common.Example
}

Expand Down Expand Up @@ -264,7 +270,7 @@ func TestPGStatMonitorSchema(t *testing.T) {
_, err = db.Exec(selectAllCountries)
require.NoError(t, err)

buckets, err = m.getNewBuckets(context.Background(), 60)
buckets, err = m.getNewBuckets(context.Background(), 60, normalizedQuery)
require.NoError(t, err)
buckets = filter(buckets)
t.Logf("Actual:\n%s", tests.FormatBuckets(buckets))
Expand Down Expand Up @@ -327,7 +333,12 @@ func TestPGStatMonitorSchema(t *testing.T) {
_, err := db.Exec(q, args...)
require.NoError(t, err)

buckets, err := m.getNewBuckets(context.Background(), 60)
settings, err := m.getSettings()
require.NoError(t, err)
normalizedQuery, err := settings.getNormalizedQueryValue()
require.NoError(t, err)

buckets, err := m.getNewBuckets(context.Background(), 60, normalizedQuery)
require.NoError(t, err)
buckets = filter(buckets)
t.Logf("Actual:\n%s", tests.FormatBuckets(buckets))
Expand Down Expand Up @@ -385,7 +396,7 @@ func TestPGStatMonitorSchema(t *testing.T) {
_, err = db.Exec(q, args...)
require.NoError(t, err)

buckets, err = m.getNewBuckets(context.Background(), 60)
buckets, err = m.getNewBuckets(context.Background(), 60, normalizedQuery)
require.NoError(t, err)
buckets = filter(buckets)
t.Logf("Actual:\n%s", tests.FormatBuckets(buckets))
Expand Down Expand Up @@ -469,9 +480,14 @@ func TestPGStatMonitorSchema(t *testing.T) {
}
waitGroup.Wait()

settings, err := m.getSettings()
require.NoError(t, err)
normalizedQuery, err := settings.getNormalizedQueryValue()
require.NoError(t, err)

var buckets []*agentpb.MetricsBucket
for i := 0; i < 100; i++ {
buckets, err = m.getNewBuckets(context.Background(), 60)
buckets, err = m.getNewBuckets(context.Background(), 60, normalizedQuery)
require.NoError(t, err)
buckets = filter(buckets)
t.Logf("Actual:\n%s", tests.FormatBuckets(buckets))
Expand Down
7 changes: 6 additions & 1 deletion agents/postgres/pgstatmonitor/stat_monitor_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@ func TestPGStatMonitorStructs(t *testing.T) {
}()

m := setup(t, db, false)
current, cache, err := m.monitorCache.getStatMonitorExtended(context.TODO(), db.Querier, m.pgsmNormalizedQuery)
settings, err := m.getSettings()
assert.NoError(t, err)
normalizedQuery, err := settings.getNormalizedQueryValue()
assert.NoError(t, err)

current, cache, err := m.monitorCache.getStatMonitorExtended(context.TODO(), db.Querier, normalizedQuery)

require.NoError(t, err)
require.NotNil(t, current)
Expand Down