// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" import ( "context" "slices" "sort" "sync" "time" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) type buckets[N int64 | float64] struct { attrs attribute.Set res exemplar.FilteredReservoir[N] counts []uint64 count uint64 total N min, max N } // newBuckets returns buckets with n bins. func newBuckets[N int64 | float64](attrs attribute.Set, n int) *buckets[N] { return &buckets[N]{attrs: attrs, counts: make([]uint64, n)} } func (b *buckets[N]) sum(value N) { b.total += value } func (b *buckets[N]) bin(idx int, value N) { b.counts[idx]++ b.count++ if value < b.min { b.min = value } else if value > b.max { b.max = value } } // histValues summarizes a set of measurements as an histValues with // explicitly defined buckets. type histValues[N int64 | float64] struct { noSum bool bounds []float64 newRes func() exemplar.FilteredReservoir[N] limit limiter[*buckets[N]] values map[attribute.Distinct]*buckets[N] valuesMu sync.Mutex } func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func() exemplar.FilteredReservoir[N]) *histValues[N] { // The responsibility of keeping all buckets correctly associated with the // passed boundaries is ultimately this type's responsibility. Make a copy // here so we can always guarantee this. Or, in the case of failure, have // complete control over the fix. b := slices.Clone(bounds) slices.Sort(b) return &histValues[N]{ noSum: noSum, bounds: b, newRes: r, limit: newLimiter[*buckets[N]](limit), values: make(map[attribute.Distinct]*buckets[N]), } } // Aggregate records the measurement value, scoped by attr, and aggregates it // into a histogram. func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) { // This search will return an index in the range [0, len(s.bounds)], where // it will return len(s.bounds) if value is greater than the last element // of s.bounds. This aligns with the buckets in that the length of buckets // is len(s.bounds)+1, with the last bucket representing: // (s.bounds[len(s.bounds)-1], +∞). idx := sort.SearchFloat64s(s.bounds, float64(value)) s.valuesMu.Lock() defer s.valuesMu.Unlock() attr := s.limit.Attributes(fltrAttr, s.values) b, ok := s.values[attr.Equivalent()] if !ok { // N+1 buckets. For example: // // bounds = [0, 5, 10] // // Then, // // buckets = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞) b = newBuckets[N](attr, len(s.bounds)+1) b.res = s.newRes() // Ensure min and max are recorded values (not zero), for new buckets. b.min, b.max = value, value s.values[attr.Equivalent()] = b } b.bin(idx, value) if !s.noSum { b.sum(value) } b.res.Offer(ctx, value, droppedAttr) } // newHistogram returns an Aggregator that summarizes a set of measurements as // an histogram. func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func() exemplar.FilteredReservoir[N]) *histogram[N] { return &histogram[N]{ histValues: newHistValues[N](boundaries, noSum, limit, r), noMinMax: noMinMax, start: now(), } } // histogram summarizes a set of measurements as an histogram with explicitly // defined buckets. type histogram[N int64 | float64] struct { *histValues[N] noMinMax bool start time.Time } func (s *histogram[N]) delta(dest *metricdata.Aggregation) int { t := now() // If *dest is not a metricdata.Histogram, memory reuse is missed. In that // case, use the zero-value h and hope for better alignment next cycle. h, _ := (*dest).(metricdata.Histogram[N]) h.Temporality = metricdata.DeltaTemporality s.valuesMu.Lock() defer s.valuesMu.Unlock() // Do not allow modification of our copy of bounds. bounds := slices.Clone(s.bounds) n := len(s.values) hDPts := reset(h.DataPoints, n, n) var i int for _, val := range s.values { hDPts[i].Attributes = val.attrs hDPts[i].StartTime = s.start hDPts[i].Time = t hDPts[i].Count = val.count hDPts[i].Bounds = bounds hDPts[i].BucketCounts = val.counts if !s.noSum { hDPts[i].Sum = val.total } if !s.noMinMax { hDPts[i].Min = metricdata.NewExtrema(val.min) hDPts[i].Max = metricdata.NewExtrema(val.max) } collectExemplars(&hDPts[i].Exemplars, val.res.Collect) i++ } // Unused attribute sets do not report. clear(s.values) // The delta collection cycle resets. s.start = t h.DataPoints = hDPts *dest = h return n } func (s *histogram[N]) cumulative(dest *metricdata.Aggregation) int { t := now() // If *dest is not a metricdata.Histogram, memory reuse is missed. In that // case, use the zero-value h and hope for better alignment next cycle. h, _ := (*dest).(metricdata.Histogram[N]) h.Temporality = metricdata.CumulativeTemporality s.valuesMu.Lock() defer s.valuesMu.Unlock() // Do not allow modification of our copy of bounds. bounds := slices.Clone(s.bounds) n := len(s.values) hDPts := reset(h.DataPoints, n, n) var i int for _, val := range s.values { hDPts[i].Attributes = val.attrs hDPts[i].StartTime = s.start hDPts[i].Time = t hDPts[i].Count = val.count hDPts[i].Bounds = bounds // The HistogramDataPoint field values returned need to be copies of // the buckets value as we will keep updating them. // // TODO (#3047): Making copies for bounds and counts incurs a large // memory allocation footprint. Alternatives should be explored. hDPts[i].BucketCounts = slices.Clone(val.counts) if !s.noSum { hDPts[i].Sum = val.total } if !s.noMinMax { hDPts[i].Min = metricdata.NewExtrema(val.min) hDPts[i].Max = metricdata.NewExtrema(val.max) } collectExemplars(&hDPts[i].Exemplars, val.res.Collect) i++ // TODO (#3006): This will use an unbounded amount of memory if there // are unbounded number of attribute sets being aggregated. Attribute // sets that become "stale" need to be forgotten so this will not // overload the system. } h.DataPoints = hDPts *dest = h return n }