Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

* [ENHANCEMENT] Ingester: Add `enable_matcher_optimization` config to apply low selectivity matchers lazily. #7063
* [ENHANCEMENT] Distributor: Add a label references validation for remote write v2 request. #7074
* [ENHANCEMENT] Distributor: Add count, spans, and buckets validations for native histogram. #7072
* [BUGFIX] Compactor: Avoid race condition which allow a grouper to not compact all partitions. #7082

## 1.20.0 in progress
Expand Down
16 changes: 16 additions & 0 deletions pkg/util/validation/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,22 @@ func (e *nativeHistogramSampleSizeBytesExceededError) Error() string {
return fmt.Sprintf("native histogram sample size bytes exceeded for metric (actual: %d, limit: %d) metric: %.200q", e.nhSampleSizeBytes, e.limit, formatLabelSet(e.series))
}

type nativeHistogramInvalidError struct {
nhValidationErr error
series []cortexpb.LabelAdapter
}

func newNativeHistogramInvalidError(series []cortexpb.LabelAdapter, nhValidationErr error) ValidationError {
return &nativeHistogramInvalidError{
series: series,
nhValidationErr: nhValidationErr,
}
}

func (e *nativeHistogramInvalidError) Error() string {
return fmt.Sprintf("invalid native histogram, validation err: %v, metric: %.200q", e.nhValidationErr, formatLabelSet(e.series))
}

// formatLabelSet formats label adapters as a metric name with labels, while preserving
// label order, and keeping duplicates. If there are multiple "__name__" labels, only
// first one is used as metric name, other ones will be included as regular labels.
Expand Down
36 changes: 27 additions & 9 deletions pkg/util/validation/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ const (
nativeHistogramBucketCountLimitExceeded = "native_histogram_buckets_exceeded"
nativeHistogramInvalidSchema = "native_histogram_invalid_schema"
nativeHistogramSampleSizeBytesExceeded = "native_histogram_sample_size_bytes_exceeded"
nativeHistogramInvalid = "native_histogram_invalid"

// RateLimited is one of the values for the reason to discard samples.
// Declared here to avoid duplication in ingester and distributor.
Expand Down Expand Up @@ -368,7 +369,6 @@ func ValidateMetadata(validateMetrics *ValidateMetrics, cfg *Limits, userID stri
}

func ValidateNativeHistogram(validateMetrics *ValidateMetrics, limits *Limits, userID string, ls []cortexpb.LabelAdapter, histogramSample cortexpb.Histogram) (cortexpb.Histogram, error) {

// sample size validation for native histogram
if limits.MaxNativeHistogramSampleSizeBytes > 0 && histogramSample.Size() > limits.MaxNativeHistogramSampleSizeBytes {
validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramSampleSizeBytesExceeded, userID).Inc()
Expand All @@ -381,25 +381,33 @@ func ValidateNativeHistogram(validateMetrics *ValidateMetrics, limits *Limits, u
return cortexpb.Histogram{}, newNativeHistogramSchemaInvalidError(ls, int(histogramSample.Schema))
}

if limits.MaxNativeHistogramBuckets == 0 {
return histogramSample, nil
}

var (
exceedLimit bool
)

if histogramSample.IsFloatHistogram() {
// Initial check to see if the bucket limit is exceeded or not. If not, we can avoid type casting.
fh := cortexpb.FloatHistogramProtoToFloatHistogram(histogramSample)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For next PR, consider adding sync pool here to reduce allocation.

if err := fh.Validate(); err != nil {
validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramInvalid, userID).Inc()
return cortexpb.Histogram{}, newNativeHistogramInvalidError(ls, err)
}

// limit check
if limits.MaxNativeHistogramBuckets == 0 {
return histogramSample, nil
}

exceedLimit = len(histogramSample.PositiveCounts)+len(histogramSample.NegativeCounts) > limits.MaxNativeHistogramBuckets
if !exceedLimit {
return histogramSample, nil
}

// Exceed limit.
if histogramSample.Schema <= histogram.ExponentialSchemaMin {
validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramBucketCountLimitExceeded, userID).Inc()
return cortexpb.Histogram{}, newHistogramBucketLimitExceededError(ls, limits.MaxNativeHistogramBuckets)
}
fh := cortexpb.FloatHistogramProtoToFloatHistogram(histogramSample)

oBuckets := len(fh.PositiveBuckets) + len(fh.NegativeBuckets)
for len(fh.PositiveBuckets)+len(fh.NegativeBuckets) > limits.MaxNativeHistogramBuckets {
if fh.Schema <= histogram.ExponentialSchemaMin {
Expand All @@ -415,7 +423,17 @@ func ValidateNativeHistogram(validateMetrics *ValidateMetrics, limits *Limits, u
return cortexpb.FloatHistogramToHistogramProto(histogramSample.TimestampMs, fh), nil
}

// Initial check to see if bucket limit is exceeded or not. If not, we can avoid type casting.
h := cortexpb.HistogramProtoToHistogram(histogramSample)
if err := h.Validate(); err != nil {
validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramInvalid, userID).Inc()
return cortexpb.Histogram{}, newNativeHistogramInvalidError(ls, err)
}

// limit check
if limits.MaxNativeHistogramBuckets == 0 {
return histogramSample, nil
}

exceedLimit = len(histogramSample.PositiveDeltas)+len(histogramSample.NegativeDeltas) > limits.MaxNativeHistogramBuckets
if !exceedLimit {
return histogramSample, nil
Expand All @@ -425,7 +443,6 @@ func ValidateNativeHistogram(validateMetrics *ValidateMetrics, limits *Limits, u
validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramBucketCountLimitExceeded, userID).Inc()
return cortexpb.Histogram{}, newHistogramBucketLimitExceededError(ls, limits.MaxNativeHistogramBuckets)
}
h := cortexpb.HistogramProtoToHistogram(histogramSample)
oBuckets := len(h.PositiveBuckets) + len(h.NegativeBuckets)
for len(h.PositiveBuckets)+len(h.NegativeBuckets) > limits.MaxNativeHistogramBuckets {
if h.Schema <= histogram.ExponentialSchemaMin {
Expand All @@ -437,6 +454,7 @@ func ValidateNativeHistogram(validateMetrics *ValidateMetrics, limits *Limits, u
if oBuckets != len(h.PositiveBuckets)+len(h.NegativeBuckets) {
validateMetrics.HistogramSamplesReducedResolution.WithLabelValues(userID).Inc()
}

// If resolution reduced, convert new histogram to protobuf type again.
return cortexpb.HistogramToHistogramProto(histogramSample.TimestampMs, h), nil
}
Expand Down
137 changes: 136 additions & 1 deletion pkg/util/validation/validate_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package validation

import (
"errors"
"math"
"net/http"
"strings"
"testing"
Expand Down Expand Up @@ -412,6 +414,55 @@ func TestValidateNativeHistogram(t *testing.T) {
exceedMaxRangeSchemaFloatHistogram.Schema = 20
exceedMaxSampleSizeBytesLimitFloatHistogram := tsdbutil.GenerateTestFloatHistogram(100)

bucketNumMisMatchInPSpanFH := tsdbutil.GenerateTestFloatHistogram(0)
bucketNumMisMatchInPSpanFH.PositiveSpans[0].Length = 3

negativeSpanOffsetInPSpansFH := tsdbutil.GenerateTestFloatHistogram(0)
negativeSpanOffsetInPSpansFH.PositiveSpans[1].Offset = -1

bucketNumMisMatchInNSpanFH := tsdbutil.GenerateTestFloatHistogram(0)
bucketNumMisMatchInNSpanFH.NegativeSpans[0].Length = 3

negativeSpanOffsetInNSpansFH := tsdbutil.GenerateTestFloatHistogram(0)
negativeSpanOffsetInNSpansFH.NegativeSpans[1].Offset = -1

negativeBucketCountInNBucketsFH := tsdbutil.GenerateTestFloatHistogram(0)
negativeBucketCountInNBucketsFH.NegativeBuckets = []float64{-1.1, -1.2, -1.3, -1.4}

negativeBucketCountInPBucketsFH := tsdbutil.GenerateTestFloatHistogram(0)
negativeBucketCountInPBucketsFH.PositiveBuckets = []float64{-1.1, -1.2, -1.3, -1.4}

negativeCountFloatHistogram := tsdbutil.GenerateTestFloatHistogram(0)
negativeCountFloatHistogram.Count = -1.2345

negativeZeroCountFloatHistogram := tsdbutil.GenerateTestFloatHistogram(0)
negativeZeroCountFloatHistogram.ZeroCount = -1.2345

bucketNumMisMatchInPSpanH := tsdbutil.GenerateTestHistogram(0)
bucketNumMisMatchInPSpanH.PositiveSpans[0].Length = 3

negativeSpanOffsetInPSpansH := tsdbutil.GenerateTestHistogram(0)
negativeSpanOffsetInPSpansH.PositiveSpans[1].Offset = -1

bucketNumMisMatchInNSpanH := tsdbutil.GenerateTestHistogram(0)
bucketNumMisMatchInNSpanH.NegativeSpans[0].Length = 3

negativeSpanOffsetInNSpansH := tsdbutil.GenerateTestHistogram(0)
negativeSpanOffsetInNSpansH.NegativeSpans[1].Offset = -1

negativeBucketCountInNBucketsH := tsdbutil.GenerateTestHistogram(0)
negativeBucketCountInNBucketsH.NegativeBuckets = []int64{-1, -2, -3, -4}

negativeBucketCountInPBucketsH := tsdbutil.GenerateTestHistogram(0)
negativeBucketCountInPBucketsH.PositiveBuckets = []int64{-1, -2, -3, -4}

countMisMatchSumIsNaN := tsdbutil.GenerateTestHistogram(0)
countMisMatchSumIsNaN.Sum = math.NaN()
countMisMatchSumIsNaN.Count = 11

countMisMatch := tsdbutil.GenerateTestHistogram(0)
countMisMatch.Count = 11

for _, tc := range []struct {
name string
bucketLimit int
Expand Down Expand Up @@ -525,6 +576,90 @@ func TestValidateNativeHistogram(t *testing.T) {
discardedSampleLabelValue: nativeHistogramSampleSizeBytesExceeded,
maxNativeHistogramSampleSizeBytesLimit: 100,
},
{
name: "bucket number mismatch in positive spans for float native histogram",
histogram: cortexpb.FloatHistogramToHistogramProto(0, bucketNumMisMatchInPSpanFH.Copy()),
expectedErr: newNativeHistogramInvalidError(lbls, errors.New("positive side: spans need 5 buckets, have 4 buckets: histogram spans specify different number of buckets than provided")),
discardedSampleLabelValue: nativeHistogramInvalid,
},
{
name: "negative span offset found in positive spans for float native histogram",
histogram: cortexpb.FloatHistogramToHistogramProto(0, negativeSpanOffsetInPSpansFH.Copy()),
expectedErr: newNativeHistogramInvalidError(lbls, errors.New("positive side: span number 2 with offset -1: histogram has a span whose offset is negative")),
discardedSampleLabelValue: nativeHistogramInvalid,
},
{
name: "bucket number mismatch in negative spans for float native histogram",
histogram: cortexpb.FloatHistogramToHistogramProto(0, bucketNumMisMatchInNSpanFH.Copy()),
expectedErr: newNativeHistogramInvalidError(lbls, errors.New("negative side: spans need 5 buckets, have 4 buckets: histogram spans specify different number of buckets than provided")),
discardedSampleLabelValue: nativeHistogramInvalid,
},
{
name: "negative spans offset found in negative spans for float native histogram",
histogram: cortexpb.FloatHistogramToHistogramProto(0, negativeSpanOffsetInNSpansFH.Copy()),
expectedErr: newNativeHistogramInvalidError(lbls, errors.New("negative side: span number 2 with offset -1: histogram has a span whose offset is negative")),
discardedSampleLabelValue: nativeHistogramInvalid,
},
{
name: "negative observations count in negative buckets for float native histogram",
histogram: cortexpb.FloatHistogramToHistogramProto(0, negativeBucketCountInNBucketsFH.Copy()),
expectedErr: newNativeHistogramInvalidError(lbls, errors.New("negative side: bucket number 1 has observation count of -1.1: histogram has a bucket whose observation count is negative")),
discardedSampleLabelValue: nativeHistogramInvalid,
},
{
name: "negative observations count in positive buckets for native histogram",
histogram: cortexpb.HistogramToHistogramProto(0, negativeBucketCountInPBucketsH.Copy()),
expectedErr: newNativeHistogramInvalidError(lbls, errors.New("positive side: bucket number 1 has observation count of -1: histogram has a bucket whose observation count is negative")),
discardedSampleLabelValue: nativeHistogramInvalid,
},
{
name: "bucket number mismatch in positive spans for native histogram",
histogram: cortexpb.HistogramToHistogramProto(0, bucketNumMisMatchInPSpanH.Copy()),
expectedErr: newNativeHistogramInvalidError(lbls, errors.New("positive side: spans need 5 buckets, have 4 buckets: histogram spans specify different number of buckets than provided")),
discardedSampleLabelValue: nativeHistogramInvalid,
},
{
name: "negative span offset found in positive spans for native histogram",
histogram: cortexpb.HistogramToHistogramProto(0, negativeSpanOffsetInPSpansH.Copy()),
expectedErr: newNativeHistogramInvalidError(lbls, errors.New("positive side: span number 2 with offset -1: histogram has a span whose offset is negative")),
discardedSampleLabelValue: nativeHistogramInvalid,
},
{
name: "bucket number mismatch in negative spans for native histogram",
histogram: cortexpb.HistogramToHistogramProto(0, bucketNumMisMatchInNSpanH.Copy()),
expectedErr: newNativeHistogramInvalidError(lbls, errors.New("negative side: spans need 5 buckets, have 4 buckets: histogram spans specify different number of buckets than provided")),
discardedSampleLabelValue: nativeHistogramInvalid,
},
{
name: "negative spans offset found in negative spans for native histogram",
histogram: cortexpb.FloatHistogramToHistogramProto(0, negativeSpanOffsetInNSpansFH.Copy()),
expectedErr: newNativeHistogramInvalidError(lbls, errors.New("negative side: span number 2 with offset -1: histogram has a span whose offset is negative")),
discardedSampleLabelValue: nativeHistogramInvalid,
},
{
name: "negative observations count in negative buckets for native histogram",
histogram: cortexpb.HistogramToHistogramProto(0, negativeBucketCountInNBucketsH.Copy()),
expectedErr: newNativeHistogramInvalidError(lbls, errors.New("negative side: bucket number 1 has observation count of -1: histogram has a bucket whose observation count is negative")),
discardedSampleLabelValue: nativeHistogramInvalid,
},
{
name: "negative observations count in positive buckets for native histogram",
histogram: cortexpb.HistogramToHistogramProto(0, negativeBucketCountInPBucketsH.Copy()),
expectedErr: newNativeHistogramInvalidError(lbls, errors.New("positive side: bucket number 1 has observation count of -1: histogram has a bucket whose observation count is negative")),
discardedSampleLabelValue: nativeHistogramInvalid,
},
{
name: "mismatch between observations count with count field when sum is NaN",
histogram: cortexpb.HistogramToHistogramProto(0, countMisMatchSumIsNaN.Copy()),
expectedErr: newNativeHistogramInvalidError(lbls, errors.New("12 observations found in buckets, but the Count field is 11: histogram's observation count should be at least the number of observations found in the buckets")),
discardedSampleLabelValue: nativeHistogramInvalid,
},
{
name: "mismatch between observations count with count field",
histogram: cortexpb.HistogramToHistogramProto(0, countMisMatch.Copy()),
expectedErr: newNativeHistogramInvalidError(lbls, errors.New("12 observations found in buckets, but the Count field is 11: histogram's observation count should equal the number of observations found in the buckets (in absence of NaN)")),
discardedSampleLabelValue: nativeHistogramInvalid,
},
} {
t.Run(tc.name, func(t *testing.T) {
reg := prometheus.NewRegistry()
Expand All @@ -534,7 +669,7 @@ func TestValidateNativeHistogram(t *testing.T) {
limits.MaxNativeHistogramSampleSizeBytes = tc.maxNativeHistogramSampleSizeBytesLimit
actualHistogram, actualErr := ValidateNativeHistogram(validateMetrics, limits, userID, lbls, tc.histogram)
if tc.expectedErr != nil {
require.Equal(t, tc.expectedErr, actualErr)
require.Equal(t, tc.expectedErr.Error(), actualErr.Error())
require.Equal(t, float64(1), testutil.ToFloat64(validateMetrics.DiscardedSamples.WithLabelValues(tc.discardedSampleLabelValue, userID)))
// Should never increment if error was returned
require.Equal(t, float64(0), testutil.ToFloat64(validateMetrics.HistogramSamplesReducedResolution.WithLabelValues(userID)))
Expand Down
Loading