Fast Top N Aggregation and Filtering with DuckDB

Author Avatar
Alex Monahan2024-10-25

TL;DR: Find the top N values or filter to the latest N rows more quickly and easily with the N parameter in the min, max, min_by, and max_by aggregate functions.

Introduction to Top N

A common pattern when analyzing data is to look for the rows of data that are the highest or lowest in a particular metric. When interested in the highest or lowest N rows in an entire dataset, SQL's standard ORDER BY and LIMIT clauses will sort by the metric of interest and only return N rows. For example, using the scale factor 1 (SF1) data set of the TPC-H benchmark:

INSTALL tpch;
LOAD tpch;
-- Generate an example TPC-H dataset
CALL dbgen(sf = 1);

-- Return the most recent 3 rows by l_shipdate
FROM lineitem 
ORDER BY 
    l_shipdate DESC 
LIMIT 3;
l_orderkey l_partkey l_shipmode l_comment
354528 6116 MAIL wake according to the u
413956 16402 SHIP usual patterns. carefull
484581 10970 TRUCK ccounts maintain. dogged accounts a

This is useful to quickly get the oldest or newest values in a dataset or to find outliers in a particular metric.

Another common approach is to query the min/max summary statistics of one or more columns. This can find outliers, but the row that contains the outlier can be different for each column, so it is answering a different question. DuckDB's helpful COLUMNS expression allows us to calculate the maximum value for all columns.

FROM lineitem
SELECT 
    max(COLUMNS(*));

The queries in this post make extensive use of DuckDB's FROM-first syntax. This allows the FROM and SELECT clauses to be swapped, and it even allows omitting the latter entirely.

l_orderkey l_partkey l_shipmode l_comment
600000 20000 TRUCK zzle. slyly

However, these two approaches can only answer certain kinds of questions. There are many scenarios where the goal is to understand the top N values within a group. In the first example above, how would we calculate the last 10 shipments from each supplier? SQL's LIMIT clause is not able to handle that situation. Let's call this type of analysis the top N by group.

This type of analysis is a common tool for exploring new datasets. Use cases include pulling the most recent few rows for each group or finding the most extreme few values in a group. Sticking with our shipment example, we could look at the last 10 shipments of each part number, or find the 5 highest priced orders per customer.

Traditional Top N by Group

In most databases, the way to filter to the top N within a group is to use a window function and a common table expression (CTE). This approach also works in DuckDB. For example, this query returns the 3 most recent shipments for each supplier:

WITH ranked_lineitem AS (
    FROM lineitem 
    SELECT 
        *,
        row_number() OVER
            (PARTITION BY l_suppkey ORDER BY l_shipdate DESC)
            AS my_ranking
)
FROM ranked_lineitem
WHERE 
    my_ranking <= 3;
l_orderkey l_partkey l_suppkey l_shipmode l_comment my_ranking
1310688 169532 7081 RAIL ully final exc 1
910561 194561 7081 SHIP ly bold excuses caj 2
4406883 179529 7081 RAIL tions. furious 3
4792742 52095 7106 RAIL onic, ironic courts. final deposits sleep 1
4010212 122081 7106 MAIL accounts cajole finally ironic instruc 2
1220871 94596 7106 TRUCK regular requests above t 3

In DuckDB, this can be simplified using the QUALIFY clause. QUALIFY acts like a WHERE clause, but specifically operates on the results of window functions. By making this adjustment, the CTE can be avoided while returning the same results.

FROM lineitem 
SELECT
    *,
    row_number() OVER
        (PARTITION BY l_suppkey ORDER BY l_shipdate DESC)
        AS my_ranking
QUALIFY
    my_ranking <= 3;

This is certainly a viable approach! However, what are its weaknesses? Even though the query is interested in only the 3 most recent shipments, it must sort every shipment just to retrieve those top 3. Sorting in DuckDB has a complexity of O(kn) due to DuckDB's innovative Radix sort implementation, but this is still higher than the O(n) of DuckDB's hash aggregate, for example. Sorting is also a memory intensive operation when compared with aggregation.

Top N in DuckDB

DuckDB 1.1 added a new capability to dramatically simplify and improve performance of top N calculations. Namely, the functions min, max, min_by, and max_by all now accept an optional parameter N. If N is greater than 1 (the default), they will return an array of the top values.

As a simple example, let's query the most recent (top 3) shipment dates:

FROM lineitem
SELECT
    max(l_shipdate, 3) AS top_3_shipdates;
top_3_shipdates
[1998-12-01, 1998-12-01, 1998-12-01]

Top N by Column in DuckDB

The top N selection can become even more useful thanks to the COLUMNS expression once again – we can retrieve the 3 top values in each column. We can call this a top N by column analysis. It is particularly messy to try to do this analysis with ordinary SQL! You would need a subquery or window function for every single column… In DuckDB, simply:

FROM lineitem
SELECT 
    max(COLUMNS(*), 3) AS "top_3_\0";
top_3_l_orderkey top_3_l_partkey top_3_l_shipmode top_3_l_comment
[600000, 600000, 599975] [20000, 20000, 20000] [TRUCK, TRUCK, TRUCK] [zzle. slyly, zzle. quickly bold a, zzle. pinto beans boost slyly slyly fin]

Top N by Group in DuckDB

Armed with the new N parameter, how can we speed up a top N by group analysis?

Want to cut to the chase and see the final output? Feel free to skip ahead!

We will take advantage of three other DuckDB SQL features to make this possible:

The max function will return the max (or now the max N!) of a specific column. In contrast, the max_by function will find the maximum value in a column, and then retrieve a value from the same row, but a different column. For example, this query will return the ids of the 3 most recently shipped orders for each supplier:

FROM lineitem
SELECT 
    l_suppkey,
    max_by(l_orderkey, l_shipdate, 3) AS recent_orders
GROUP BY
    l_suppkey;
l_suppkey recent_orders
2992 [233573, 3597639, 3060227]
8516 [4675968, 5431174, 4626530]
3205 [3844610, 4396966, 3405255]
2152 [1672000, 4209601, 3831138]
1880 [4852999, 2863747, 1650084]

The max_by function is an aggregate function, so it takes advantage of DuckDB's fast hash aggregation rather than sorting. Instead of sorting by l_shipdate, the max_by function scans through the dataset just once and keeps track of the N highest l_shipdate values. It then returns the order id that corresponds with each of the most recent shipment dates. The radix sort in DuckDB must scan through the dataset once per byte, so scanning only once provides a significant speedup. For example, if sorting by a 64-bit integer, the sort algorithm must loop through the dataset 8 times vs. 1 with this approach! A simple micro-benchmark is included in the Performance Comparisons section.

However, this SQL query has a few gaps. The query returns results as a LIST rather than as separate rows. Thankfully the unnest function can split a LIST into separate rows:

FROM lineitem
SELECT 
    l_suppkey,
    unnest(
        max_by(l_orderkey, l_shipdate, 3)
    ) AS recent_orders
GROUP BY
    l_suppkey;
l_suppkey recent_orders
2576 930468
2576 2248354
2576 3640711
5559 4022148
5559 1675680
5559 4976259

The next gap is that there is no way to easily see the l_shipdate associated with the returned l_orderkey values. This query only returns a single column, while typically a top N by group analysis will require the entire row.

Fortunately, DuckDB allows us to refer to the entire contents of a row as if it were just a single column! By referring to the name of the table itself (here, lineitem) instead of the name of a column, the max_by function can retrieve all columns.

FROM lineitem
SELECT 
    l_suppkey,
    unnest(
        max_by(lineitem, l_shipdate, 3)
    ) AS recent_orders
GROUP BY
    l_suppkey;
l_suppkey recent_orders
5411 {'l_orderkey': 2543618, 'l_partkey': 105410, 'l_suppkey': 5411, …
5411 {'l_orderkey': 580547, 'l_partkey': 130384, 'l_suppkey': 5411, …
5411 {'l_orderkey': 3908642, 'l_partkey': 132897, 'l_suppkey': 5411, …
90 {'l_orderkey': 4529697, 'l_partkey': 122553, 'l_suppkey': 90, …
90 {'l_orderkey': 4473346, 'l_partkey': 160089, 'l_suppkey': 90, …

Let's make that a bit friendlier looking by splitting the STRUCT out into separate columns to match our original dataset.

The Final Top N by Group Query

Passing in one more argument to UNNEST will split this out into separate columns by running recursively. In this case, that means that UNNEST will run twice: once to convert each LIST into separate rows, and then again to convert each STRUCT into separate columns. The l_suppkey column can also be excluded, since it will automatically be included already.

FROM lineitem
SELECT 
    unnest(
        max_by(lineitem, l_shipdate, 3),
        recursive := 1
    ) AS recent_orders
GROUP BY
    l_suppkey;
l_orderkey l_partkey l_suppkey l_shipinstruct l_shipmode l_comment
1234726 6875 6876 COLLECT COD FOB cajole carefully slyly fin
2584193 51865 6876 TAKE BACK RETURN TRUCK fully regular deposits at the q
2375524 26875 6876 DELIVER IN PERSON AIR nusual ideas. busily bold deposi
5751559 95626 8136 NONE SHIP ers nag fluffily against the spe
3103457 103115 8136 TAKE BACK RETURN FOB y slyly express warthogs– unusual, e
5759105 178135 8136 COLLECT COD TRUCK es. regular pinto beans haggle.

This approach can also be useful for the common task of de-duplicating by finding the latest value within a group. One pattern is to find the current state of a dataset by returning the most recent event in an events table. Simply use an N of 1!

We now have a way to use an aggregate function to calculate the top N rows per group! So, how much more efficient is it?

Performance Comparisons

We will compare the QUALIFY approach with the max_by approach for solving the top N by group problem. We have discussed both queries, but for reference they are repeated below.

QUALIFY query:
FROM lineitem 
SELECT 
    *,
    row_number() OVER
        (PARTITION BY l_suppkey ORDER BY l_shipdate DESC)
        AS my_ranking
QUALIFY 
    my_ranking <= 3;
max_by query:
FROM lineitem 
SELECT 
    unnest(
        max_by(lineitem, l_shipdate, 3),
        recursive := 1
    )
GROUP BY 
    l_suppkey;

While the main query is running, we will also kick off a background thread to periodically measure DuckDB's memory use. This uses the built in table function duckdb_memory() and includes information about Memory usage as well as temporary disk usage. The small Python script used for benchmarking is included below the results. The machine used for benchmarking was an M1 MacBook Pro with 16 GB RAM.

SF max_memory Metric QUALIFY max_by Improvement
1 Default Total time 0.58 s 0.24 s 2.4×
5 Default Total time 6.15 s 1.26 s 4.9×
10 36GB Total time 36.8 s 25.4 s 1.4×
1 Default Memory usage 1.7 GB 0.2 GB 8.5×
5 Default Memory usage 7.9 GB 1.5 GB 5.3×
10 36GB Memory usage 15.7 GB 17.1 GB 0.9×

We can see that in each of these situations, the max_by approach is faster, in some cases nearly 5× faster! However, as the data grows larger, the max_by approach begins to weaken relative to QUALIFY.

In some cases, the memory use is significantly lower with max_by also. However, the memory use of the max_by approach becomes more significant as scale increases, because the number of distinct l_suppkey values increases linearly with the scale factor. This increased memory use likely explains the performance decrease, as both algorithms approached the maximum amount of RAM on my machine and began to swap to disk.

In order to reduce the memory pressure, let's re-run the scale factor 10 (SF10) benchmark using fewer threads (4 threads and 1 thread). We continue to use a max_memory setting of 36 GB. The prior SF10 results with all 10 threads are included for reference.

SF Threads Metric QUALIFY max_by Improvement
10 10 Total time 36.8 s 25.4 s 1.4×
10 4 Total time 49.0 s 21.0 s 2.3×
10 1 Total time 115.7 s 12.7 s 9.1×
10 10 Memory usage 15.7 GB 17.1 GB 0.9×
10 4 Memory usage 15.9 GB 17.3 GB 0.9×
10 1 Memory usage 14.5 GB 1.8 GB 8.1×

The max_by approach is so computationally efficient that even with 1 thread it is dramatically faster than the QUALIFY approach that uses all 10 threads! Reducing the thread count very effectively lowered the memory use as well (a nearly 10× reduction).

So, when should we use each? As with all database things, it depends! If memory is constrained, max_by may also offer benefits, especially when the thread count is tuned to avoid spilling to disk. However, if there are approximately as many groups as there are rows, consider QUALIFY since we lose some of the memory efficiency of the max_by approach.

Python Benchmarking Script
import duckdb 
import pandas as pd 
from threading import Thread
from time import sleep
from datetime import datetime
from os import remove 

def check_memory(stop_function, filepath, sleep_seconds, results_dict):
    print("Starting background thread")
    background_con = duckdb.connect(filepath)
    max_memory = 0
    max_temporary_storage = 0
    while True:
        if stop_function():
            break
        # Profile the memory
        memory_profile = background_con.sql("""
            FROM duckdb_memory() 
            SELECT 
                tag,
                round(memory_usage_bytes / (1000000), 0)::bigint AS memory_usage_mb,
                round(temporary_storage_bytes / (1000000), 0)::bigint AS temporary_storage_mb;
            """).df()
        print(memory_profile)
        total_memory = background_con.sql("""
            FROM memory_profile 
            select 
                sum(memory_usage_mb) AS total_memory_usage_mb,
                sum(temporary_storage_mb) AS total_temporary_storage_mb
            """).fetchall()
        print('Current memory:', total_memory[0][0])
        print('Current temporary_storage:', total_memory[0][1])

        if total_memory[0][0] > max_memory:
            max_memory = total_memory[0][0]
        if total_memory[0][1] > max_temporary_storage:
            max_temporary_storage = total_memory[0][1]
        
        print('Maximum memory:', max_memory)
        print('Maximum temporary_storage:', max_temporary_storage)

        sleep(sleep_seconds)
    
    results_dict["max_memory"] = max_memory
    results_dict["max_temporary_storage"] = max_temporary_storage
    background_con.close()

    return 

def query_and_profile(filepath, sql):
    con = duckdb.connect(filepath)
    con.sql("set max_memory='36GB'")

    results_dict = {}
    stop_threads = False
    background_memory_thread = Thread(target=check_memory,
                                      args=(lambda : stop_threads, filepath, 0.1, results_dict, ))
    background_memory_thread.start()

    print("Starting query:")
    start_time = datetime.now()
    results_df = con.sql(sql).df()
    results_dict["total_time_seconds"] = (datetime.now() - start_time).total_seconds()
    print(results_df.head(10))

    stop_threads = True
    background_memory_thread.join()
    con.close()

    return results_dict

filepath = './arg_max_check_duckdb_memory_v3.duckdb'

con = duckdb.connect(filepath)
print("Begin initial tpch load")
con.sql("""call dbgen(sf=1);""")
con.close()

sql = """
    FROM lineitem 
    SELECT 
        UNNEST(
            max_by(lineitem, l_shipdate, 3),
            recursive := 1
        )
    GROUP BY 
        l_suppkey
;"""

max_by_results = query_and_profile(filepath, sql)

sql = """
    FROM lineitem 
    SELECT 
        *,
        row_number() OVER
            (PARTITION BY l_suppkey ORDER BY l_shipdate DESC)
            AS my_ranking
    QUALIFY 
        my_ranking <= 3
;"""

qualify_results = query_and_profile(filepath, sql)

print('max_by_results:', max_by_results)
print('qualify_results:', qualify_results)

remove(filepath)

Conclusion

DuckDB now offers a convenient way to calculate the top N values of both min and max aggregate functions, as well as their advanced cousins min_by and max_by. They are easy to get started with, and also enable more complex analyses like calculating the top N for all columns or the top N by group. There are also possible performance benefits when compared with a window function approach.

We would love to hear about the creative ways you are able to use this new feature!

Happy analyzing!