histogram/
sparse.rs

1use crate::{Bucket, Config, Error, Histogram};
2
3/// This histogram is a sparse, columnar representation of the regular
4/// Histogram. It is significantly smaller than a regular Histogram
5/// when a large number of buckets are zero, which is a frequent
6/// occurence. It stores an individual vector for each field
7/// of non-zero buckets. Assuming index[0] = n, (index[0], count[0])
8/// corresponds to the nth bucket.
9#[derive(Clone, Debug, PartialEq)]
10#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
11#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
12pub struct SparseHistogram {
13    /// parameters representing the resolution and the range of
14    /// the histogram tracking request latencies
15    pub config: Config,
16    /// indices for the non-zero buckets in the histogram
17    pub index: Vec<usize>,
18    /// histogram bucket counts corresponding to the indices
19    pub count: Vec<u64>,
20}
21
22impl SparseHistogram {
23    /// Construct a new histogram from the provided parameters. See the
24    /// documentation for [`crate::Config`] to understand their meaning.
25    pub fn new(grouping_power: u8, max_value_power: u8) -> Result<Self, Error> {
26        let config = Config::new(grouping_power, max_value_power)?;
27
28        Ok(Self::with_config(&config))
29    }
30
31    /// Creates a new histogram using a provided [`crate::Config`].
32    pub fn with_config(config: &Config) -> Self {
33        Self {
34            config: *config,
35            index: Vec::new(),
36            count: Vec::new(),
37        }
38    }
39
40    /// Helper function to store a bucket in the histogram.
41    fn add_bucket(&mut self, idx: usize, n: u64) {
42        if n != 0 {
43            self.index.push(idx);
44            self.count.push(n);
45        }
46    }
47
48    /// Adds the other histogram to this histogram and returns the result as a
49    /// new histogram.
50    ///
51    /// An error is returned if the two histograms have incompatible parameters.
52    /// Buckets which have values in both histograms are allowed to wrap.
53    #[allow(clippy::comparison_chain)]
54    pub fn wrapping_add(&self, h: &SparseHistogram) -> Result<SparseHistogram, Error> {
55        if self.config != h.config {
56            return Err(Error::IncompatibleParameters);
57        }
58
59        let mut histogram = SparseHistogram::with_config(&self.config);
60
61        // Sort and merge buckets from both histograms
62        let (mut i, mut j) = (0, 0);
63        while i < self.index.len() && j < h.index.len() {
64            let (k1, v1) = (self.index[i], self.count[i]);
65            let (k2, v2) = (h.index[j], h.count[j]);
66
67            if k1 == k2 {
68                histogram.add_bucket(k1, v1 + v2);
69                (i, j) = (i + 1, j + 1);
70            } else if k1 < k2 {
71                histogram.add_bucket(k1, v1);
72                i += 1;
73            } else {
74                histogram.add_bucket(k2, v2);
75                j += 1;
76            }
77        }
78
79        // Fill remaining values, if any, from the left histogram
80        if i < self.index.len() {
81            histogram.index.extend(&self.index[i..self.index.len()]);
82            histogram.count.extend(&self.count[i..self.count.len()]);
83        }
84
85        // Fill remaining values, if any, from the right histogram
86        if j < h.index.len() {
87            histogram.index.extend(&h.index[j..h.index.len()]);
88            histogram.count.extend(&h.count[j..h.count.len()]);
89        }
90
91        Ok(histogram)
92    }
93
94    /// Subtracts the other histogram to this histogram and returns the result as a
95    /// new histogram. The other histogram is expected to be a subset of the current
96    /// histogram, i.e., for every bucket in the other histogram should have a
97    /// count less than or equal to the corresponding bucket in this histogram.
98    ///
99    /// An error is returned if the two histograms have incompatible parameters
100    /// or if the other histogram is not a subset of this histogram.
101    #[allow(clippy::comparison_chain)]
102    pub fn checked_sub(&self, h: &SparseHistogram) -> Result<SparseHistogram, Error> {
103        if self.config != h.config {
104            return Err(Error::IncompatibleParameters);
105        }
106
107        let mut histogram = SparseHistogram::with_config(&self.config);
108
109        // Sort and merge buckets from both histograms
110        let (mut i, mut j) = (0, 0);
111        while i < self.index.len() && j < h.index.len() {
112            let (k1, v1) = (self.index[i], self.count[i]);
113            let (k2, v2) = (h.index[j], h.count[j]);
114
115            if k1 == k2 {
116                let v = v1.checked_sub(v2).ok_or(Error::Underflow)?;
117                if v != 0 {
118                    histogram.add_bucket(k1, v);
119                }
120                (i, j) = (i + 1, j + 1);
121            } else if k1 < k2 {
122                histogram.add_bucket(k1, v1);
123                i += 1;
124            } else {
125                // Other histogram has a bucket not present in this histogram,
126                // i.e., it is not a subset of this histogram
127                return Err(Error::InvalidSubset);
128            }
129        }
130
131        // Check that the subset histogram has been consumed
132        if j < h.index.len() {
133            return Err(Error::InvalidSubset);
134        }
135
136        // Fill remaining bucets, if any, from the superset histogram
137        if i < self.index.len() {
138            histogram.index.extend(&self.index[i..self.index.len()]);
139            histogram.count.extend(&self.count[i..self.count.len()]);
140        }
141
142        Ok(histogram)
143    }
144
145    /// Return a collection of percentiles from this histogram.
146    ///
147    /// Each percentile should be in the inclusive range `0.0..=100.0`. For
148    /// example, the 50th percentile (median) can be found using `50.0`.
149    ///
150    /// The results will be sorted by the percentile.
151    pub fn percentiles(&self, percentiles: &[f64]) -> Result<Option<Vec<(f64, Bucket)>>, Error> {
152        // validate all the percentiles
153        if percentiles.is_empty() {
154            return Err(Error::InvalidPercentile);
155        }
156
157        for percentile in percentiles {
158            if !(0.0..=100.0).contains(percentile) {
159                return Err(Error::InvalidPercentile);
160            }
161        }
162        // get the total count
163        let total_count: u128 = self.count.iter().map(|v| *v as u128).sum();
164
165        // empty histogram, no percentiles available
166        if total_count == 0 {
167            return Ok(None);
168        }
169
170        // sort the requested percentiles so we can find them in a single pass
171        let mut percentiles = percentiles.to_vec();
172        percentiles.sort_by(|a, b| a.partial_cmp(b).unwrap());
173
174        let mut idx = 0;
175        let mut partial_sum = self.count[0] as u128;
176
177        let result: Vec<(f64, Bucket)> = percentiles
178            .iter()
179            .filter_map(|percentile| {
180                // For 0.0th percentile (min) we need to report the first bucket
181                // with a non-zero count.
182                let count =
183                    std::cmp::max(1, (percentile / 100.0 * total_count as f64).ceil() as u128);
184
185                loop {
186                    // found the matching bucket index for this percentile
187                    if partial_sum >= count {
188                        return Some((
189                            *percentile,
190                            Bucket {
191                                count: self.count[idx],
192                                range: self.config.index_to_range(self.index[idx]),
193                            },
194                        ));
195                    }
196
197                    // check if we have reached the end of the buckets
198                    if idx == (self.index.len() - 1) {
199                        break;
200                    }
201
202                    // otherwise, increment the index, partial sum, and loop
203                    idx += 1;
204                    partial_sum += self.count[idx] as u128;
205                }
206
207                None
208            })
209            .collect();
210
211        Ok(Some(result))
212    }
213
214    /// Return a single percentile from this histogram.
215    ///
216    /// The percentile should be in the inclusive range `0.0..=100.0`. For
217    /// example, the 50th percentile (median) can be found using `50.0`.
218    pub fn percentile(&self, percentile: f64) -> Result<Option<Bucket>, Error> {
219        self.percentiles(&[percentile])
220            .map(|v| v.map(|x| x.first().unwrap().1.clone()))
221    }
222
223    /// Returns a new histogram with a reduced grouping power. The reduced
224    /// grouping power should lie in the range (0..existing grouping power).
225    ///
226    /// This works by iterating over every bucket in the existing histogram
227    /// and inserting the contained values into the new histogram. While we
228    /// do not know the exact values of the data points (only that they lie
229    /// within the bucket's range), it does not matter since the bucket is
230    /// not split during downsampling and any value can be used.
231    pub fn downsample(&self, grouping_power: u8) -> Result<SparseHistogram, Error> {
232        if grouping_power >= self.config.grouping_power() {
233            return Err(Error::MaxPowerTooLow);
234        }
235
236        let config = Config::new(grouping_power, self.config.max_value_power())?;
237        let mut histogram = SparseHistogram::with_config(&config);
238
239        // Multiple buckets in the old histogram will map to the same bucket
240        // in the new histogram, so we have to aggregate bucket values from the
241        // old histogram before inserting a bucket into the new downsampled
242        // histogram. However, mappings between the histograms monotonically
243        // increase, so once a bucket in the old histogram maps to a higher
244        // bucket in the new histogram than is currently being aggregated,
245        // the bucket can be sealed and inserted into the new histogram.
246        let mut aggregating_idx: usize = 0;
247        let mut aggregating_count: u64 = 0;
248        for (idx, n) in self.index.iter().zip(self.count.iter()) {
249            let new_idx = config.value_to_index(self.config.index_to_lower_bound(*idx))?;
250
251            // If it maps to the currently aggregating bucket, merge counts
252            if new_idx == aggregating_idx {
253                aggregating_count += n;
254                continue;
255            }
256
257            // Does not map to the aggregating bucket, so seal and store that bucket
258            histogram.add_bucket(aggregating_idx, aggregating_count);
259
260            // Start tracking this bucket as the current aggregating bucket
261            aggregating_idx = new_idx;
262            aggregating_count = *n;
263        }
264
265        // Add the final aggregated bucket
266        histogram.add_bucket(aggregating_idx, aggregating_count);
267
268        Ok(histogram)
269    }
270}
271
272impl<'a> IntoIterator for &'a SparseHistogram {
273    type Item = Bucket;
274    type IntoIter = Iter<'a>;
275
276    fn into_iter(self) -> Self::IntoIter {
277        Iter {
278            index: 0,
279            histogram: self,
280        }
281    }
282}
283
284/// An iterator across the histogram buckets.
285pub struct Iter<'a> {
286    index: usize,
287    histogram: &'a SparseHistogram,
288}
289
290impl Iterator for Iter<'_> {
291    type Item = Bucket;
292
293    fn next(&mut self) -> Option<<Self as std::iter::Iterator>::Item> {
294        if self.index >= self.histogram.index.len() {
295            return None;
296        }
297
298        let bucket = Bucket {
299            count: self.histogram.count[self.index],
300            range: self
301                .histogram
302                .config
303                .index_to_range(self.histogram.index[self.index]),
304        };
305
306        self.index += 1;
307
308        Some(bucket)
309    }
310}
311
312impl From<&Histogram> for SparseHistogram {
313    fn from(histogram: &Histogram) -> Self {
314        let mut index = Vec::new();
315        let mut count = Vec::new();
316
317        for (idx, n) in histogram.as_slice().iter().enumerate() {
318            if *n > 0 {
319                index.push(idx);
320                count.push(*n);
321            }
322        }
323
324        Self {
325            config: histogram.config(),
326            index,
327            count,
328        }
329    }
330}
331
332#[cfg(test)]
333mod tests {
334    use rand::Rng;
335    use std::collections::HashMap;
336
337    use super::*;
338    use crate::standard::Histogram;
339
340    #[test]
341    fn wrapping_add() {
342        let config = Config::new(7, 32).unwrap();
343
344        let h1 = SparseHistogram {
345            config,
346            index: vec![1, 3, 5],
347            count: vec![6, 12, 7],
348        };
349
350        let h2 = SparseHistogram::with_config(&config);
351
352        let h3 = SparseHistogram {
353            config,
354            index: vec![2, 3, 6, 11, 13],
355            count: vec![5, 7, 3, 15, 6],
356        };
357
358        let hdiff = SparseHistogram::new(6, 16).unwrap();
359        let h = h1.wrapping_add(&hdiff);
360        assert_eq!(h, Err(Error::IncompatibleParameters));
361
362        let h = h1.wrapping_add(&h2).unwrap();
363        assert_eq!(h.index, vec![1, 3, 5]);
364        assert_eq!(h.count, vec![6, 12, 7]);
365
366        let h = h2.wrapping_add(&h3).unwrap();
367        assert_eq!(h.index, vec![2, 3, 6, 11, 13]);
368        assert_eq!(h.count, vec![5, 7, 3, 15, 6]);
369
370        let h = h1.wrapping_add(&h3).unwrap();
371        assert_eq!(h.index, vec![1, 2, 3, 5, 6, 11, 13]);
372        assert_eq!(h.count, vec![6, 5, 19, 7, 3, 15, 6]);
373    }
374
375    #[test]
376    fn checked_sub() {
377        let config = Config::new(7, 32).unwrap();
378
379        let h1 = SparseHistogram {
380            config,
381            index: vec![1, 3, 5],
382            count: vec![6, 12, 7],
383        };
384
385        let hparams = SparseHistogram::new(6, 16).unwrap();
386        let h = h1.checked_sub(&hparams);
387        assert_eq!(h, Err(Error::IncompatibleParameters));
388
389        let hempty = SparseHistogram::with_config(&config);
390        let h = h1.checked_sub(&hempty).unwrap();
391        assert_eq!(h.index, vec![1, 3, 5]);
392        assert_eq!(h.count, vec![6, 12, 7]);
393
394        let hclone = h1.clone();
395        let h = h1.checked_sub(&hclone).unwrap();
396        assert!(h.index.is_empty());
397        assert!(h.count.is_empty());
398
399        let hlarger = SparseHistogram {
400            config,
401            index: vec![1, 3, 5],
402            count: vec![4, 13, 7],
403        };
404        let h = h1.checked_sub(&hlarger);
405        assert_eq!(h, Err(Error::Underflow));
406
407        let hmore = SparseHistogram {
408            config,
409            index: vec![1, 5, 7],
410            count: vec![4, 7, 1],
411        };
412        let h = h1.checked_sub(&hmore);
413        assert_eq!(h, Err(Error::InvalidSubset));
414
415        let hdiff = SparseHistogram {
416            config,
417            index: vec![1, 2, 5],
418            count: vec![4, 1, 7],
419        };
420        let h = h1.checked_sub(&hdiff);
421        assert_eq!(h, Err(Error::InvalidSubset));
422
423        let hsubset = SparseHistogram {
424            config,
425            index: vec![1, 3],
426            count: vec![5, 9],
427        };
428        let h = h1.checked_sub(&hsubset).unwrap();
429        assert_eq!(h.index, vec![1, 3, 5]);
430        assert_eq!(h.count, vec![1, 3, 7]);
431    }
432
433    #[test]
434    fn percentiles() {
435        let mut hstandard = Histogram::new(4, 10).unwrap();
436        let hempty = SparseHistogram::from(&hstandard);
437
438        for v in 1..1024 {
439            let _ = hstandard.increment(v);
440        }
441
442        let hsparse = SparseHistogram::from(&hstandard);
443        let percentiles = [1.0, 10.0, 25.0, 50.0, 75.0, 90.0, 99.0, 99.9];
444        for percentile in percentiles {
445            let bempty = hempty.percentile(percentile).unwrap();
446            let bstandard = hstandard.percentile(percentile).unwrap();
447            let bsparse = hsparse.percentile(percentile).unwrap();
448
449            assert_eq!(bempty, None);
450            assert_eq!(bsparse, bstandard);
451        }
452
453        assert_eq!(hempty.percentiles(&percentiles), Ok(None));
454        assert_eq!(
455            hstandard.percentiles(&percentiles).unwrap(),
456            hsparse.percentiles(&percentiles).unwrap()
457        );
458    }
459
460    #[test]
461    // Tests percentile used to find min
462    fn min() {
463        let mut histogram = Histogram::new(7, 64).unwrap();
464
465        let h = SparseHistogram::from(&histogram);
466        assert_eq!(h.percentile(0.0).unwrap(), None);
467
468        let _ = histogram.increment(10);
469        let h = SparseHistogram::from(&histogram);
470        assert_eq!(h.percentile(0.0).map(|b| b.unwrap().end()), Ok(10));
471
472        let _ = histogram.increment(4);
473        let h = SparseHistogram::from(&histogram);
474        assert_eq!(h.percentile(0.0).map(|b| b.unwrap().end()), Ok(4));
475    }
476
477    fn compare_histograms(hstandard: &Histogram, hsparse: &SparseHistogram) {
478        assert_eq!(hstandard.config(), hsparse.config);
479
480        let mut buckets: HashMap<usize, u64> = HashMap::new();
481        for (idx, count) in hsparse.index.iter().zip(hsparse.count.iter()) {
482            let _ = buckets.insert(*idx, *count);
483        }
484
485        for (idx, count) in hstandard.as_slice().iter().enumerate() {
486            if *count > 0 {
487                let v = buckets.get(&idx).unwrap();
488                assert_eq!(*v, *count);
489            }
490        }
491    }
492
493    #[test]
494    fn snapshot() {
495        let mut hstandard = Histogram::new(5, 10).unwrap();
496
497        for v in 1..1024 {
498            let _ = hstandard.increment(v);
499        }
500
501        // Convert to sparse and store buckets in a hash for random lookup
502        let hsparse = SparseHistogram::from(&hstandard);
503        compare_histograms(&hstandard, &hsparse);
504    }
505
506    #[test]
507    fn downsample() {
508        let mut histogram = Histogram::new(8, 32).unwrap();
509        let mut rng = rand::thread_rng();
510
511        // Generate 10,000 values to store in a sorted array and a histogram
512        for _ in 0..10000 {
513            let v: u64 = rng.gen_range(1..2_u64.pow(histogram.config.max_value_power() as u32));
514            let _ = histogram.increment(v);
515        }
516
517        let hsparse = SparseHistogram::from(&histogram);
518        compare_histograms(&histogram, &hsparse);
519
520        // Downsample and compare heck the percentiles lie within error margin
521        let grouping_power = histogram.config.grouping_power();
522        for factor in 1..grouping_power {
523            let reduced_gp = grouping_power - factor;
524            let h1 = histogram.downsample(reduced_gp).unwrap();
525            let h2 = hsparse.downsample(reduced_gp).unwrap();
526            compare_histograms(&h1, &h2);
527        }
528    }
529}