Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions internal/k6runner/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func TestScriptHTTPRun(t *testing.T) {
zlogger = testhelper.Logger(t)
)

success, err := script.Run(ctx, registry, logger, zlogger, SecretStore{})
success, _, err := script.Run(ctx, registry, logger, zlogger, SecretStore{})
require.Equal(t, tc.expectSuccess, success)
require.Equal(t, tc.expectLogs, logger.buf.String())
if tc.expectErrorAs == nil {
Expand Down Expand Up @@ -453,7 +453,7 @@ func TestHTTPProcessorRetries(t *testing.T) {
logger testLogger
zlogger = zerolog.New(io.Discard)
)
success, err := processor.Run(ctx, registry, &logger, zlogger, SecretStore{})
success, _, err := processor.Run(ctx, registry, &logger, zlogger, SecretStore{})
require.ErrorIs(t, err, tc.expectError)
require.Equal(t, tc.expectError == nil, success)
require.Equal(t, tc.expectRequests, requests.Load())
Expand Down Expand Up @@ -499,7 +499,7 @@ func TestHTTPProcessorRetries(t *testing.T) {
logger testLogger
zlogger = zerolog.New(io.Discard)
)
success, err := processor.Run(ctx, registry, &logger, zlogger, SecretStore{})
success, _, err := processor.Run(ctx, registry, &logger, zlogger, SecretStore{})
require.NoError(t, err)
require.True(t, success)
})
Expand Down
40 changes: 28 additions & 12 deletions internal/k6runner/k6runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"strings"
"time"

"github.com/go-logfmt/logfmt"
smmmodel "github.com/grafana/synthetic-monitoring-agent/internal/model"
Expand Down Expand Up @@ -147,7 +148,7 @@ var (
ErrFromRunner = errors.New("runner reported an error")
)

func (r Processor) Run(ctx context.Context, registry *prometheus.Registry, logger logger.Logger, internalLogger zerolog.Logger, secretStore SecretStore) (bool, error) {
func (r Processor) Run(ctx context.Context, registry *prometheus.Registry, logger logger.Logger, internalLogger zerolog.Logger, secretStore SecretStore) (bool, time.Duration, error) {
k6runner := r.runner.WithLogger(&internalLogger)

// TODO: This error message is okay to be Debug for local k6 execution, but should be Error for remote runners.
Expand All @@ -156,15 +157,15 @@ func (r Processor) Run(ctx context.Context, registry *prometheus.Registry, logge
internalLogger.Debug().
Err(err).
Msg("k6 script exited with error code")
return false, err
return false, 0, err
}

// If only one of Error and ErrorCode are non-empty, the proxy is misbehaving.
switch {
case result.Error == "" && result.ErrorCode != "":
fallthrough
case result.Error != "" && result.ErrorCode == "":
return false, fmt.Errorf(
return false, 0, fmt.Errorf(
"%w: only one of error (%q) and errorCode (%q) is non-empty",
ErrBuggyRunner, result.Error, result.ErrorCode,
)
Expand Down Expand Up @@ -194,40 +195,41 @@ func (r Processor) Run(ctx context.Context, registry *prometheus.Registry, logge
internalLogger.Debug().
Err(err).
Msg("cannot load logs to logger")
return false, err
return false, 0, err
}

var (
collector sampleCollector
resultCollector checkResultCollector
collector sampleCollector
resultCollector checkResultCollector
durationCollector probeDurationCollector
)

if err := extractMetricSamples(result.Metrics, internalLogger, collector.process, resultCollector.process); err != nil {
if err := extractMetricSamples(result.Metrics, internalLogger, collector.process, resultCollector.process, durationCollector.process); err != nil {
internalLogger.Debug().
Err(err).
Msg("cannot extract metric samples")
return false, err
return false, 0, err
}

if err := registry.Register(&collector.collector); err != nil {
internalLogger.Error().
Err(err).
Msg("cannot register collector")
return false, err
return false, 0, err
}

// https://github.com/grafana/sm-k6-runner/blob/b811839d444a7e69fd056b0a4e6ccf7e914197f3/internal/mq/runner.go#L51
switch result.ErrorCode {
case "":
// No error, all good.
return true, nil
return true, durationCollector.duration, nil
// TODO: Remove "user" from this list, which has been renamed to "aborted".
case "timeout", "killed", "user", "failed", "aborted":
// These are user errors. The probe failed, but we don't return an error.
return false, nil
return false, durationCollector.duration, nil
default:
// We got an "unknown" error, or some other code we do not recognize. Return it so we log it.
return false, fmt.Errorf("%w: %s: %s", ErrFromRunner, result.ErrorCode, result.Error)
return false, durationCollector.duration, fmt.Errorf("%w: %s: %s", ErrFromRunner, result.ErrorCode, result.Error)
}
}

Expand Down Expand Up @@ -302,6 +304,20 @@ func (rc *checkResultCollector) process(mf *dto.MetricFamily, sample *model.Samp
return nil
}

type probeDurationCollector struct {
duration time.Duration
}

func (dc *probeDurationCollector) process(_ *dto.MetricFamily, sample *model.Sample) error {
if sample.Metric[model.MetricNameLabel] != "probe_script_duration_seconds" {
return nil
}

dc.duration = time.Duration(float64(sample.Value) * float64(time.Second))

return nil
}

func extractMetricSamples(metrics []byte, logger zerolog.Logger, processors ...sampleProcessorFunc) error {
promDecoder := expfmt.NewDecoder(bytes.NewBuffer(metrics), expfmt.NewFormat(expfmt.TypeTextPlain))
decoderOpts := expfmt.DecodeOptions{Timestamp: model.Now()}
Expand Down
4 changes: 3 additions & 1 deletion internal/k6runner/k6runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sort"
"strings"
"testing"
"time"

"github.com/grafana/synthetic-monitoring-agent/internal/model"
"github.com/grafana/synthetic-monitoring-agent/internal/prober/logger"
Expand Down Expand Up @@ -90,9 +91,10 @@ func TestScriptRun(t *testing.T) {
// We already know tha parsing the metrics and the logs is working, so
// we are only interested in verifying that the script runs without
// errors.
success, err := processor.Run(ctx, registry, &logger, zlogger, SecretStore{})
success, duration, err := processor.Run(ctx, registry, &logger, zlogger, SecretStore{})
require.NoError(t, err)
require.True(t, success)
require.Equal(t, 500*time.Millisecond, duration)
}

func TestCheckInfoFromSM(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions internal/k6runner/testdata/test.out
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
probe_data_sent_bytes{scenario="default"} 0
probe_data_received_bytes{scenario="default"} 0
probe_iteration_duration_seconds{scenario="default"} 1.3029e-05
probe_script_duration_seconds 0.500
6 changes: 2 additions & 4 deletions internal/prober/browser/browser.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,20 +66,18 @@ func (p Prober) Name() string {

func (p Prober) Probe(ctx context.Context, target string, registry *prometheus.Registry, logger logger.Logger) (bool, float64) {
secretStore, err := p.secretsRetriever(ctx)

if err != nil {
p.logger.Error().Err(err).Msg("failed to retrieve secret store")
return false, 0
}

success, err := p.processor.Run(ctx, registry, logger, p.logger, secretStore)
success, duration, err := p.processor.Run(ctx, registry, logger, p.logger, secretStore)
if err != nil {
p.logger.Error().Err(err).Msg("running probe")
return false, 0
}

// TODO(mem): implement custom duration extraction.
return success, 0
return success, duration.Seconds()
}

// TODO(mem): This should probably be in the k6runner package.
Expand Down
6 changes: 2 additions & 4 deletions internal/prober/multihttp/multihttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,20 +89,18 @@ func (p Prober) Name() string {

func (p Prober) Probe(ctx context.Context, target string, registry *prometheus.Registry, logger logger.Logger) (bool, float64) {
secretStore, err := p.secretsRetriever(ctx)

if err != nil {
p.logger.Error().Err(err).Msg("running probe")
return false, 0
}

success, err := p.processor.Run(ctx, registry, logger, p.logger, secretStore)
success, duration, err := p.processor.Run(ctx, registry, logger, p.logger, secretStore)
if err != nil {
p.logger.Error().Err(err).Msg("running probe")
return false, 0
}

// TODO(mem): implement custom duration extraction.
return success, 0
return success, duration.Seconds()
}

// Overrides any user-provided headers with our own augmented values
Expand Down
2 changes: 1 addition & 1 deletion internal/prober/multihttp/script_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,7 @@ func TestSettingsToScript(t *testing.T) {
t.Log("Log entries:\n" + buf.String())

require.True(t, success)
require.Equal(t, float64(0), duration)
require.NotEqual(t, 0, duration)
}

func TestReplaceVariablesInString(t *testing.T) {
Expand Down
5 changes: 2 additions & 3 deletions internal/prober/scripted/scripted.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,13 @@ func (p Prober) Probe(ctx context.Context, target string, registry *prometheus.R
Bool("hasSecretStoreToken", secretStore.Token != "").
Msg("secret store retrieved for scripted probe")

success, err := p.processor.Run(ctx, registry, logger, p.logger, secretStore)
success, duration, err := p.processor.Run(ctx, registry, logger, p.logger, secretStore)
if err != nil {
p.logger.Error().Err(err).Msg("running probe")
return false, 0
}

// TODO(mem): implement custom duration extraction.
return success, 0
return success, duration.Seconds()
}

func newCredentialsRetriever(provider secrets.SecretProvider, tenantID model.GlobalID, logger zerolog.Logger) func(context.Context) (k6runner.SecretStore, error) {
Expand Down
Loading