This article was originally published on Synnada on July 13, 2023 and is part of our ongoing series on streaming SQL engines and advanced join algorithms. In previous posts, we discussed techniques like The Sliding Window Hash Join Algorithm: Efficiently Joining Infinite Streams with Order Preservation and General-purpose Stream Joins via Pruning Symmetric Hash Joins and Running Windowing Queries in Stream Processing that serve as building blocks. The concepts discussed in this series are implemented in Apache Arrow/DataFusion, which has a bunch of other cool features we haven’t talked about yet.
The Count-Min Sketch (CMS) is a probabilistic data structure that can be used to estimate the frequency of items in a stream of data. It is particularly useful when the size of the data stream is too large to store in memory, or when we want to perform frequent updates to the data stream in real-time.
To understand how the CMS works, it’s helpful to first understand some basic concepts:
- Hash functions are functions that take an input (such as a string or a number) and produce a fixed-size output (called a “hash value”) in a way that is difficult to predict or reverse. Hash functions are often used to store and retrieve data efficiently, as they allow us to map data items to indices in an array without having to store the data itself.
- Probabilistic data structures are data structures that use randomness to achieve their goals with a certain probability. This means that the results of probabilistic data structures are not always accurate, but the probability of error can be controlled by the user.
The basic idea behind the CMS is to use hash functions to map items in the stream to a fixed-size array, and then use this array to store an estimate of the frequency of each item. The CMS maintains a two-dimensional array of counters, where each row corresponds to a hash function and each column corresponds to a hash value. When an item is encountered in the stream, it is hashed using each of the hash functions and the corresponding counters are incremented.
Let’s figure it out with a simple example. First, we’ll initialize the table with all zeros:
+---+---+---+---+---+---+---+---+
| 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
+---+---+---+---+---+---+---+---+
| 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
+---+---+---+---+---+---+---+---+
| 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
+---+---+---+---+---+---+---+---+
Now, let’s say we want to process the following stream of items: ["apple, "banana", "cherry"]
To update the table with this data, we’ll need to increment the count at the positions in the table that are mapped to by each of our hash functions for each item in the stream.
For example, the first item in the stream is "apple"
. The positions in the table that are mapped to by each of our hash functions for this item are:
- hash1(“apple”): 3
- hash2(“apple”): 1
- hash3(“apple”): 4
So, we’ll increment the count at these positions in the table:
+---+---+---+---+---+---+---+---+
| 0| 0| 0| 1| 0| 0| 0| 0|
+---+---+---+---+---+---+---+---+
| 0| 1| 0| 0| 0| 0| 0| 0|
+---+---+---+---+---+---+---+---+
| 0| 0| 0| 0| 1| 0| 0| 0|
+---+---+---+---+---+---+---+---+
Now, let’s add the second item in the stream, "banana"
. The positions in the table that are mapped to by each of our hash functions for this item are:
- hash1(“banana”): 1
- hash2(“banana”): 7
- hash3(“banana”): 3
So, we’ll increment the count at these positions in the table:
+---+----+---+----+---+---+---+----+
| 0| 1| 0| 1| 0| 0| 0| 0|
+---+----+---+----+---+---+---+----+
| 0| 1| 0| 0| 0| 0| 0| 1|
+---+----+---+----+---+---+---+----+
| 0| 0| 0| 1| 1| 0| 0| 0|
+---+----+---+----+---+---+---+----+
And, "cherry"
. Suppose, we have a hash collision with apple.
- hash1(“cherry”): 0
- hash2(“cherry”): 1
- hash3(“cherry”): 6
The final table will look like this:
+---+----+---+----+---+---+---+----+
| 1| 0| 0| 1| 0| 0| 0| 0|
+---+----+---+----+---+---+---+----+
| 0| 2| 0| 0| 0| 0| 0| 1|
+---+----+---+----+---+---+---+----+
| 0| 0| 0| 1| 1| 0| 1| 0|
+---+----+---+----+---+---+---+----+
Now, let’s say we want to estimate the frequency of the item "apple"
in the stream. To do this, we'll look at the positions in the table that are mapped to by each of our hash functions for this item, and take the minimum count at these positions.
The positions in the table that are mapped to by each of our hash functions for “apple” are:
- hash1(“apple”): 3
- hash2(“apple”): 1
- hash3(“apple”): 4
And we get [1, 2, 1] from each row. To estimate the frequency of an item, we simply take the minimum value among the counters corresponding to that item across all the rows. This is because the hash functions are designed to evenly distribute items across the array, so the minimum count is likely to be the most accurate estimate of the true frequency.
Rust Implementation
Defining the data structure
use rand::Rng;
use std::cmp::max;
use std::hash::{BuildHasher, Hash, Hasher};
use twox_hash::xxh3::Hash64;
struct CountMinSketch {
counts: Vec<Vec<u64>>,
}
The CMS is implemented as a two-dimensional array of unsigned 64-bit integers, called counts
. The dimensions of this array are determined by two parameters: width
and depth
.
The values of width
and depth
are chosen to ensure that the error rate of the sketch is within a certain threshold. The error rate of the sketch is defined as the maximum difference between the true frequency of an item and its estimated frequency in the sketch.
Choosing parameter values
To determine the values of width
and depth
that will result in a given error rate, we can use the following formula:
width = ceil(e / ε)
depth = ceil(ln(1 − δ) / ln(1 / 2))
where e is the base of the natural logarithm (approximately 2.718), ε
is the desired error rate, and δ
is the desired probability of failure.
The width
value is chosen such that the error rate exceeds ε
with a probability of at most δ
. Recall that the CMS is a probabilistic data structure, so there is a small chance that the error rate will be higher than the desired value. The probability of failure is typically set to a very small value, such as 10⁻⁹ or 10⁻¹⁰, to ensure that the error rate is very unlikely to exceed the desired value.
The depth
value is chosen such that the probability of failure is at most δ
. This is because the CMS uses multiple hash functions to map items to the array of counters, and the more hash functions that are used, the lower the probability of failure.
Note that there is some ambiguity on how to calculate the these values. Check this post for an in-depth discussion.
Thus, we can add the utility methods
impl<T: Hash> CountMinSketch<T> {
// ...
fn determine_width(tolerance: f64) -> usize {
let width = (std::f64::consts::E / tolerance).round() as usize;
max(2, width).checked_next_power_of_two().unwrap_or(usize::MAX)
}
fn determine_depth(probability: f64) -> usize {
let ratio = (1.0 - probability).ln() / 0.5_f64.ln()
max(1, ratio.floor() as usize)
}
}
and use them to define the new
method as
impl<T: Hash> CountMinSketch<T> {
fn new(probability: f64, tolerance: f64) -> Self {
let width = Self::determine_width(tolerance);
let depth = Self::determine_depth(probability);
Self {
counts: vec![vec![0; width]; depth],
}
}
}
Inserting a new element
impl<T: Hash> CountMinSketch<T> {
// ...
fn update<T: Sized + Hash>(&mut self, item: T, count: u64) {
for (seed, row) in self.counts.iter_mut().enumerate() {
let mut hash_function = Hash64::with_seed(seed as u64);
let index = hash_once(&mut hash_function, &item) as usize % row.len();
row[index] += count;
}
}
}
Every time a new item
(of duplicity count
) is arrives from the data stream, we update the CMS to reflect this event. To do this, we first use our hash functions to map the item to unique hash values. Then, we use these hash values to update the CMS as follows:
- For each row in the
counts
array, we use the corresponding hash value to determine the index of the element in the row we want to update. - We then increment the value at this index by the count of the item.
Here, we use the xxHash algorithm in the hash_once
function to produce hash values for the items being added to the CMS. The algorithm provides:
- High speed: The xxHash algorithm is known for being very fast and efficient, making it a good choice for use in streaming applications where performance is important.
- Reasonable collision resistance: The xxHash algorithm doesn’t just produce hash values with a low probability of collision; it is also computationally somewhat difficult to find two inputs generating the same output. This property doesn’t matter for our use case (it is important in security use cases for cryptographic hash functions, which xxHash is not), but it increases our confidence that two distinct items are unlikely to get the same hash value. This is important for the CMS, as collisions can lead to incorrect frequency estimates.
- Quality of estimates: The accuracy of the estimates produced by the CMS depends, in part, on the quality of the hash function being used. The xxHash algorithm is known to produce high-quality hash values (in a distributional sense), which should benefit the estimation accuracy of the CMS.
Specifically, we used the xxh3 flavor of xxHash here. The xxh3 algorithm improves on the vanilla xxHash algorithm by leveraging various low-level optimizations while producing the final hash value. It is designed to be faster and produces higher-quality hash values than the original algorithm, particularly for larger input data.
Here are a few other hash functions that you might consider using in your CMS implementation, along with some pros and cons of each:
- MurmurHash: MurmurHash is a reasonably fast, non-cryptographic hash function that is widely used in data structures such as hash tables and bloom filters. It is known for being fast and producing reasonable-quality hash values, making it a good choice for use in the CMS. However, its mixing quality is not as high as some other hash functions, so you may see a higher rate of collisions with MurmurHash compared to other options.
- FNV (Fowler-Noll-Vo) hash: The FNV hash is another non-cryptographic hash function that is sometimes used in data structures. It is known for its simplicity and reasonable mixing properties, but it may not be as fast as some other hash functions.
The benchmark section of the xxHash source repository is a good source for many other candidates.
Ultimately, the choice of which hash function to use in the implementation will depend on the specific needs and constraints. Factors to consider might include the speed of the hash function, the collision resistance of the hash values it produces, and the quality of the hash values.
Estimating the counts
impl<T: Hash> CountMinSketch<T> {
// ...
fn estimate<T: Sized + Hash + std::fmt::Debug>(&mut self, item: T) -> u64 {
self.counts
.iter()
.enumerate()
.map(|(seed, row)| {
let mut hash_function = Hash64::with_seed(seed as u64);
row[hash_once(&mut hash_function, &item) as usize % row.len()]
})
.min()
.unwrap_or(0)
}
}
The estimate
method takes a reference to an item of type T
and returns an estimate of its frequency in the stream of data. To do this, it iterates over the rows of the counts
vector, using the iter
method to get an iterator over the rows.
For each row, it applies the corresponding hash function to the item to get the index of the column in the row that corresponds to the item. It then retrieves the value at this index and adds it to the stream of values produced by the iterator using the map
method.
Finally, it takes the minimum value from this stream of values using the min
method, and returns it as the estimate of the frequency of the item. If the stream of values is empty (i.e., if the counts
vector is empty), it returns 0 as the estimate.
Conclusion
In this article, we have introduced the count-min sketch, a data structure that allows you to estimate the frequency of items in a stream. The count-min sketch is a probabilistic data structure that uses multiple hash functions and a small array of counters to estimate the frequencies of items with a certain error rate.
We have discussed how to choose the values of width
and depth
in the count-min sketch to achieve a desired error rate, and how to select good hash functions to improve the accuracy of the sketch. We have also provided an implementation of the count-min sketch in Rust and discussed how to test it.
The count-min sketch is a useful tool for processing large streams of data and estimating the frequencies of items. It has many applications, including counting the number of unique visitors to a website, tracking the popularity of items in a recommendation system, and detecting anomalies in network traffic.
I hope you have found this article helpful and that you have a better understanding of the count-min sketch and how it works. Thank you for reading!
You can see source code from here.
References
- Mining of Massive Datasets by Jure Leskovec, Anand Rajaraman, and Jeff Ullman is a popular textbook that covers the CMS and other algorithms for processing large datasets. It provides a detailed explanation of the error rate of the count-min sketch and how it is related to the values of
width
anddepth
. - The Count-Min Sketch and its Applications by Graham Cormode and S. Muthukrishnan is a research paper that presents the CMS and analyzes its performance. It provides a more detailed analysis of the error rate of the CMS and how it is affected by the values of
width
anddepth
. - The Wikipedia page Count-Min Sketch provides an overview of the CMS and its properties. It includes a section on the error rate and how it is related to the values of
width
anddepth
. - The xxHash GitHub repository contains the source code of the xxHash algorithm and is a good resource for understanding how it works.
- A benchmarking study of hash functions: This study compares the performance of several different hash functions, including xxHash, MurmurHash, FNV, and others. It is a useful resource for understanding the relative performance of these hash functions.
Authors
- Metehan Yıldırım, Software Engineer @ Synnada
- Mehmet Ozan Kabak, CEO and Founder @ Synnada