Fast Top N Aggregation and Filtering with DuckDB
TL;DR: Find the top N values or filter to the latest N rows more quickly and easily with the N
parameter in the min
, max
, min_by
, and max_by
aggregate functions.
Introduction to Top N
A common pattern when analyzing data is to look for the rows of data that are the highest or lowest in a particular metric.
When interested in the highest or lowest N
rows in an entire dataset, SQL's standard ORDER BY
and LIMIT
clauses will sort by the metric of interest and only return N
rows.
For example, using the scale factor 1 (SF1) data set of the TPC-H benchmark:
INSTALL tpch;
LOAD tpch;
-- Generate an example TPC-H dataset
CALL dbgen(sf = 1);
-- Return the most recent 3 rows by l_shipdate
FROM lineitem
ORDER BY
l_shipdate DESC
LIMIT 3;
l_orderkey | l_partkey | … | l_shipmode | l_comment |
---|---|---|---|---|
354528 | 6116 | … | wake according to the u | |
413956 | 16402 | … | SHIP | usual patterns. carefull |
484581 | 10970 | … | TRUCK | ccounts maintain. dogged accounts a |
This is useful to quickly get the oldest or newest values in a dataset or to find outliers in a particular metric.
Another common approach is to query the min/max summary statistics of one or more columns.
This can find outliers, but the row that contains the outlier can be different for each column, so it is answering a different question.
DuckDB's helpful COLUMNS
expression allows us to calculate the maximum value for all columns.
FROM lineitem
SELECT
max(COLUMNS(*));
The queries in this post make extensive use of DuckDB's
FROM
-first syntax. This allows theFROM
andSELECT
clauses to be swapped, and it even allows omitting the latter entirely.
l_orderkey | l_partkey | … | l_shipmode | l_comment |
---|---|---|---|---|
600000 | 20000 | … | TRUCK | zzle. slyly |
However, these two approaches can only answer certain kinds of questions.
There are many scenarios where the goal is to understand the top N values within a group.
In the first example above, how would we calculate the last 10 shipments from each supplier?
SQL's LIMIT
clause is not able to handle that situation.
Let's call this type of analysis the top N by group.
This type of analysis is a common tool for exploring new datasets. Use cases include pulling the most recent few rows for each group or finding the most extreme few values in a group. Sticking with our shipment example, we could look at the last 10 shipments of each part number, or find the 5 highest priced orders per customer.
Traditional Top N by Group
In most databases, the way to filter to the top N within a group is to use a window function and a common table expression (CTE). This approach also works in DuckDB. For example, this query returns the 3 most recent shipments for each supplier:
WITH ranked_lineitem AS (
FROM lineitem
SELECT
*,
row_number() OVER
(PARTITION BY l_suppkey ORDER BY l_shipdate DESC)
AS my_ranking
)
FROM ranked_lineitem
WHERE
my_ranking <= 3;
l_orderkey | l_partkey | l_suppkey | … | l_shipmode | l_comment | my_ranking |
---|---|---|---|---|---|---|
1310688 | 169532 | 7081 | … | RAIL | ully final exc | 1 |
910561 | 194561 | 7081 | … | SHIP | ly bold excuses caj | 2 |
4406883 | 179529 | 7081 | … | RAIL | tions. furious | 3 |
4792742 | 52095 | 7106 | … | RAIL | onic, ironic courts. final deposits sleep | 1 |
4010212 | 122081 | 7106 | … | accounts cajole finally ironic instruc | 2 | |
1220871 | 94596 | 7106 | … | TRUCK | regular requests above t | 3 |
… | … | … | … | … | … | … |
In DuckDB, this can be simplified using the QUALIFY
clause.
QUALIFY
acts like a WHERE
clause, but specifically operates on the results of window functions.
By making this adjustment, the CTE can be avoided while returning the same results.
FROM lineitem
SELECT
*,
row_number() OVER
(PARTITION BY l_suppkey ORDER BY l_shipdate DESC)
AS my_ranking
QUALIFY
my_ranking <= 3;
This is certainly a viable approach!
However, what are its weaknesses?
Even though the query is interested in only the 3 most recent shipments, it must sort every shipment just to retrieve those top 3.
Sorting in DuckDB has a complexity of O(kn)
due to DuckDB's innovative Radix sort implementation, but this is still higher than the O(n)
of DuckDB's hash aggregate, for example.
Sorting is also a memory intensive operation when compared with aggregation.
Top N in DuckDB
DuckDB 1.1 added a new capability to dramatically simplify and improve performance of top N calculations.
Namely, the functions min
, max
, min_by
, and max_by
all now accept an optional parameter N
.
If N
is greater than 1 (the default), they will return an array of the top values.
As a simple example, let's query the most recent (top 3) shipment dates:
FROM lineitem
SELECT
max(l_shipdate, 3) AS top_3_shipdates;
top_3_shipdates |
---|
[1998-12-01, 1998-12-01, 1998-12-01] |
Top N by Column in DuckDB
The top N selection can become even more useful thanks to the COLUMNS
expression once again – we can retrieve the 3 top values in each column.
We can call this a top N by column analysis.
It is particularly messy to try to do this analysis with ordinary SQL!
You would need a subquery or window function for every single column…
In DuckDB, simply:
FROM lineitem
SELECT
max(COLUMNS(*), 3) AS "top_3_\0";
top_3_l_orderkey | top_3_l_partkey | … | top_3_l_shipmode | top_3_l_comment |
---|---|---|---|---|
[600000, 600000, 599975] | [20000, 20000, 20000] | … | [TRUCK, TRUCK, TRUCK] | [zzle. slyly, zzle. quickly bold a, zzle. pinto beans boost slyly slyly fin] |
Top N by Group in DuckDB
Armed with the new N
parameter, how can we speed up a top N by group analysis?
Want to cut to the chase and see the final output? Feel free to skip ahead!
We will take advantage of three other DuckDB SQL features to make this possible:
- The
max_by
function (also known asarg_max
) - The
unnest
function - Automatically packing an entire row into a
STRUCT
column
The max
function will return the max (or now the max N!) of a specific column.
In contrast, the max_by
function will find the maximum value in a column, and then retrieve a value from the same row, but a different column.
For example, this query will return the ids of the 3 most recently shipped orders for each supplier:
FROM lineitem
SELECT
l_suppkey,
max_by(l_orderkey, l_shipdate, 3) AS recent_orders
GROUP BY
l_suppkey;
l_suppkey | recent_orders |
---|---|
2992 | [233573, 3597639, 3060227] |
8516 | [4675968, 5431174, 4626530] |
3205 | [3844610, 4396966, 3405255] |
2152 | [1672000, 4209601, 3831138] |
1880 | [4852999, 2863747, 1650084] |
… | … |
The max_by
function is an aggregate function, so it takes advantage of DuckDB's fast hash aggregation rather than sorting.
Instead of sorting by l_shipdate
, the max_by
function scans through the dataset just once and keeps track of the N
highest l_shipdate
values.
It then returns the order id that corresponds with each of the most recent shipment dates.
The radix sort in DuckDB must scan through the dataset once per byte, so scanning only once provides a significant speedup.
For example, if sorting by a 64-bit integer, the sort algorithm must loop through the dataset 8 times vs. 1 with this approach!
A simple micro-benchmark is included in the Performance Comparisons section.
However, this SQL query has a few gaps.
The query returns results as a LIST
rather than as separate rows.
Thankfully the unnest
function can split a LIST
into separate rows:
FROM lineitem
SELECT
l_suppkey,
unnest(
max_by(l_orderkey, l_shipdate, 3)
) AS recent_orders
GROUP BY
l_suppkey;
l_suppkey | recent_orders |
---|---|
2576 | 930468 |
2576 | 2248354 |
2576 | 3640711 |
5559 | 4022148 |
5559 | 1675680 |
5559 | 4976259 |
… | … |
The next gap is that there is no way to easily see the l_shipdate
associated with the returned l_orderkey
values.
This query only returns a single column, while typically a top N by group analysis will require the entire row.
Fortunately, DuckDB allows us to refer to the entire contents of a row as if it were just a single column!
By referring to the name of the table itself (here, lineitem
) instead of the name of a column, the max_by
function can retrieve all columns.
FROM lineitem
SELECT
l_suppkey,
unnest(
max_by(lineitem, l_shipdate, 3)
) AS recent_orders
GROUP BY
l_suppkey;
l_suppkey | recent_orders |
---|---|
5411 | {'l_orderkey': 2543618, 'l_partkey': 105410, 'l_suppkey': 5411, … |
5411 | {'l_orderkey': 580547, 'l_partkey': 130384, 'l_suppkey': 5411, … |
5411 | {'l_orderkey': 3908642, 'l_partkey': 132897, 'l_suppkey': 5411, … |
90 | {'l_orderkey': 4529697, 'l_partkey': 122553, 'l_suppkey': 90, … |
90 | {'l_orderkey': 4473346, 'l_partkey': 160089, 'l_suppkey': 90, … |
… | … |
Let's make that a bit friendlier looking by splitting the STRUCT
out into separate columns to match our original dataset.
The Final Top N by Group Query
Passing in one more argument to UNNEST
will split this out into separate columns by running recursively.
In this case, that means that UNNEST
will run twice: once to convert each LIST
into separate rows, and then again to convert each STRUCT
into separate columns.
The l_suppkey
column can also be excluded, since it will automatically be included already.
FROM lineitem
SELECT
unnest(
max_by(lineitem, l_shipdate, 3),
recursive := 1
) AS recent_orders
GROUP BY
l_suppkey;
l_orderkey | l_partkey | l_suppkey | … | l_shipinstruct | l_shipmode | l_comment |
---|---|---|---|---|---|---|
1234726 | 6875 | 6876 | … | COLLECT COD | FOB | cajole carefully slyly fin |
2584193 | 51865 | 6876 | … | TAKE BACK RETURN | TRUCK | fully regular deposits at the q |
2375524 | 26875 | 6876 | … | DELIVER IN PERSON | AIR | nusual ideas. busily bold deposi |
5751559 | 95626 | 8136 | … | NONE | SHIP | ers nag fluffily against the spe |
3103457 | 103115 | 8136 | … | TAKE BACK RETURN | FOB | y slyly express warthogs– unusual, e |
5759105 | 178135 | 8136 | … | COLLECT COD | TRUCK | es. regular pinto beans haggle. |
… | … | … | … | … | … | … |
This approach can also be useful for the common task of de-duplicating by finding the latest value within a group. One pattern is to find the current state of a dataset by returning the most recent event in an events table. Simply use an
N
of 1!
We now have a way to use an aggregate function to calculate the top N rows per group! So, how much more efficient is it?
Performance Comparisons
We will compare the QUALIFY
approach with the max_by
approach for solving the top N by group problem.
We have discussed both queries, but for reference they are repeated below.
QUALIFY
query:
FROM lineitem
SELECT
*,
row_number() OVER
(PARTITION BY l_suppkey ORDER BY l_shipdate DESC)
AS my_ranking
QUALIFY
my_ranking <= 3;
max_by
query:
FROM lineitem
SELECT
unnest(
max_by(lineitem, l_shipdate, 3),
recursive := 1
)
GROUP BY
l_suppkey;
While the main query is running, we will also kick off a background thread to periodically measure DuckDB's memory use.
This uses the built in table function duckdb_memory()
and includes information about Memory usage as well as temporary disk usage.
The small Python script used for benchmarking is included below the results.
The machine used for benchmarking was an M1 MacBook Pro with 16 GB RAM.
SF | max_memory |
Metric | QUALIFY |
max_by |
Improvement |
---|---|---|---|---|---|
1 | Default | Total time | 0.58 s | 0.24 s | 2.4× |
5 | Default | Total time | 6.15 s | 1.26 s | 4.9× |
10 | 36GB | Total time | 36.8 s | 25.4 s | 1.4× |
1 | Default | Memory usage | 1.7 GB | 0.2 GB | 8.5× |
5 | Default | Memory usage | 7.9 GB | 1.5 GB | 5.3× |
10 | 36GB | Memory usage | 15.7 GB | 17.1 GB | 0.9× |
We can see that in each of these situations, the max_by
approach is faster, in some cases nearly 5× faster!
However, as the data grows larger, the max_by
approach begins to weaken relative to QUALIFY
.
In some cases, the memory use is significantly lower with max_by
also.
However, the memory use of the max_by
approach becomes more significant as scale increases, because the number of distinct l_suppkey
values increases linearly with the scale factor.
This increased memory use likely explains the performance decrease, as both algorithms approached the maximum amount of RAM on my machine and began to swap to disk.
In order to reduce the memory pressure, let's re-run the scale factor 10 (SF10) benchmark using fewer threads (4 threads and 1 thread).
We continue to use a max_memory
setting of 36 GB.
The prior SF10 results with all 10 threads are included for reference.
SF | Threads | Metric | QUALIFY |
max_by |
Improvement |
---|---|---|---|---|---|
10 | 10 | Total time | 36.8 s | 25.4 s | 1.4× |
10 | 4 | Total time | 49.0 s | 21.0 s | 2.3× |
10 | 1 | Total time | 115.7 s | 12.7 s | 9.1× |
10 | 10 | Memory usage | 15.7 GB | 17.1 GB | 0.9× |
10 | 4 | Memory usage | 15.9 GB | 17.3 GB | 0.9× |
10 | 1 | Memory usage | 14.5 GB | 1.8 GB | 8.1× |
The max_by
approach is so computationally efficient that even with 1 thread it is dramatically faster than the QUALIFY
approach that uses all 10 threads!
Reducing the thread count very effectively lowered the memory use as well (a nearly 10× reduction).
So, when should we use each?
As with all database things, it depends!
If memory is constrained, max_by
may also offer benefits, especially when the thread count is tuned to avoid spilling to disk.
However, if there are approximately as many groups as there are rows, consider QUALIFY
since we lose some of the memory efficiency of the max_by
approach.
Python Benchmarking Script
import duckdb
import pandas as pd
from threading import Thread
from time import sleep
from datetime import datetime
from os import remove
def check_memory(stop_function, filepath, sleep_seconds, results_dict):
print("Starting background thread")
background_con = duckdb.connect(filepath)
max_memory = 0
max_temporary_storage = 0
while True:
if stop_function():
break
# Profile the memory
memory_profile = background_con.sql("""
FROM duckdb_memory()
SELECT
tag,
round(memory_usage_bytes / (1000000), 0)::bigint AS memory_usage_mb,
round(temporary_storage_bytes / (1000000), 0)::bigint AS temporary_storage_mb;
""").df()
print(memory_profile)
total_memory = background_con.sql("""
FROM memory_profile
select
sum(memory_usage_mb) AS total_memory_usage_mb,
sum(temporary_storage_mb) AS total_temporary_storage_mb
""").fetchall()
print('Current memory:', total_memory[0][0])
print('Current temporary_storage:', total_memory[0][1])
if total_memory[0][0] > max_memory:
max_memory = total_memory[0][0]
if total_memory[0][1] > max_temporary_storage:
max_temporary_storage = total_memory[0][1]
print('Maximum memory:', max_memory)
print('Maximum temporary_storage:', max_temporary_storage)
sleep(sleep_seconds)
results_dict["max_memory"] = max_memory
results_dict["max_temporary_storage"] = max_temporary_storage
background_con.close()
return
def query_and_profile(filepath, sql):
con = duckdb.connect(filepath)
con.sql("set max_memory='36GB'")
results_dict = {}
stop_threads = False
background_memory_thread = Thread(target=check_memory,
args=(lambda : stop_threads, filepath, 0.1, results_dict, ))
background_memory_thread.start()
print("Starting query:")
start_time = datetime.now()
results_df = con.sql(sql).df()
results_dict["total_time_seconds"] = (datetime.now() - start_time).total_seconds()
print(results_df.head(10))
stop_threads = True
background_memory_thread.join()
con.close()
return results_dict
filepath = './arg_max_check_duckdb_memory_v3.duckdb'
con = duckdb.connect(filepath)
print("Begin initial tpch load")
con.sql("""call dbgen(sf=1);""")
con.close()
sql = """
FROM lineitem
SELECT
UNNEST(
max_by(lineitem, l_shipdate, 3),
recursive := 1
)
GROUP BY
l_suppkey
;"""
max_by_results = query_and_profile(filepath, sql)
sql = """
FROM lineitem
SELECT
*,
row_number() OVER
(PARTITION BY l_suppkey ORDER BY l_shipdate DESC)
AS my_ranking
QUALIFY
my_ranking <= 3
;"""
qualify_results = query_and_profile(filepath, sql)
print('max_by_results:', max_by_results)
print('qualify_results:', qualify_results)
remove(filepath)
Conclusion
DuckDB now offers a convenient way to calculate the top N values of both min
and max
aggregate functions, as well as their advanced cousins min_by
and max_by
.
They are easy to get started with, and also enable more complex analyses like calculating the top N for all columns or the top N by group.
There are also possible performance benefits when compared with a window function approach.
We would love to hear about the creative ways you are able to use this new feature!
Happy analyzing!