Range Joins in DuckDB

Author Avatar
Richard Wesley2022-05-27

TL;DR: DuckDB has fully parallelised range joins that can efficiently join millions of range predicates.

Range intersection joins are an important operation in areas such as temporal analytics, and occur when two inequality conditions are present in a join predicate. Database implementations often rely on slow O(N^2) algorithms that compare every pair of rows for these operations. Instead, DuckDB leverages its fast sorting logic to implement two highly optimized parallel join operators for these kinds of range predicates, resulting in 20-30x faster queries. With these operators, DuckDB can be used effectively in more time-series-oriented use cases.

Introduction

Joining tables row-wise is one of the fundamental and distinguishing operations of the relational model. A join connects two tables horizontally using some Boolean condition called a predicate. This sounds straightforward, but how fast the join can be performed depends on the expressions in the predicate. This has lead to the creation of different join algorithms that are optimised for different predicate types.

In this post, we will explain several join algorithms and their capabilities. In particular, we will describe a newly added "range join" algorithm that makes connecting tables on overlapping time intervals or multiple ordering conditions much faster.

Flight Data

No, this part isn't about ducks, but about air group flight statistics from the Battlestar Galactica reboot. We have a couple of tables we will be using: Pilots, Crafts, Missions and Battles. Some data was lost when the fleet dispersed, but hopefully this is enough to provide some "real life" examples!

The Pilots table contains the pilots and their data that does not change (name, call sign, serial number):

id callsign name serial
1 Apollo Lee Adama 234567
2 Starbuck Kara Thrace 462753
3 Boomer Sharon Valeri 312743
4 Kat Louanne Katraine 244977
5 Hotdog Brendan Costanza 304871
6 Husker William Adama 204971

The Crafts table contains all the various fighting craft (ignoring the "Ship Of Theseus" problem of recycled parts!):

id type tailno
1 Viper N7242C
2 Viper 2794NC
3 Raptor 312
4 Blackbird N9999C

The Missions table contains all the missions flown by pilots. Missions have a begin and end time logged with the flight deck. We will use some common pairings (and an unusual mission at the end where Commander Adama flew his old Viper):

pid cid begin end
2 2 3004-05-04 13:22:12 3004-05-04 15:05:49
1 2 3004-05-04 10:00:00 3004-05-04 18:19:12
3 3 3004-05-04 13:33:52 3004-05-05 19:12:21
6 1 3008-03-20 08:14:37 3008-03-20 10:21:15

The Battles table contains the time window of each battle with the Cylons.

battle begin end
Fall of the Colonies 3004-05-04 13:21:45 3004-05-05 02:47:16
Red Moon 3004-05-28 07:55:27 3004-05-28 08:12:19
Tylium Asteroid 3004-06-09 09:00:00 3004-06-09 11:14:29
Resurrection Ship 3004-10-28 22:00:00 3004-10-28 23:47:05

These last two tables (Missions and Battles) are examples of state tables. An object in a state table has a state that runs between two time points. For the battles, the state is just yes/no. For the missions, the state is a pilot/craft combination.

Equality Predicates

The most common type of join involves comparing one or more pairs of expressions for equality, often a primary key and a foreign key. For example, if we want a list of the craft flown by the pilots, we can join the Pilots table to the Craft table through the Missions table:

SELECT callsign, count(*), tailno
FROM Pilots p, Missions m, Crafts c
WHERE p.id = m.pid
  AND c.id = m.cid
GROUP BY ALL
ORDER BY 2 DESC;

This will give us a table like:

callsign count(*) tailno
Starbuck 127 2794NC
Boomer 55 R1234V
Apollo 3 N7242C
Husker 1 N7242C

Range Predicates

The thing to notice in this example is that the conditions joining the tables are equalities connected with ANDs. But relational joins can be defined using any Boolean predicate – even ones without equality or AND.

One common operation in temporal databases is intersecting two state tables. Suppose we want to find the time intervals when each pilot was engaged in combat so we can compute combat hours for seniority? Vipers are launched quickly, but not before the battle has started, and there can be malfunctions or pilots may be delayed getting to the flight deck.

SELECT callsign, battle,
    greatest(m.begin, b.begin) AS begin,
    least(m.end, b.end) AS end
FROM Pilots p, Missions m, Crafts c, Battles b
WHERE m.begin < b.end
  AND b.begin < m.end
  AND p.id = m.pid
  AND c.id = m.cid;

This join creates a set of records containing the call sign and period in combat for each pilot. It handles the case where a pilot returns for a new craft, excludes patrol flights, and even handles the situation when a patrol flight turns into combat! This is because intersecting state tables this way produces a joint state table - an important temporal database operation. Here are a few rows from the result:

callsign battle begin end
Starbuck Fall of the Colonies 3004-05-04 13:22:12 3004-05-04 15:05:49
Apollo Fall of the Colonies 3004-05-04 13:21:45 3004-05-04 18:19:12
Boomer Fall of the Colonies 3004-05-04 13:33:52 3004-05-05 02:47:16

Apollo was already in flight when the first Cylon attack came, so the query puts his begin time for the battle at the start of the battle, not when he launched for the decomissioning flyby. Starbuck and Boomer were scrambled after the battle started, but Boomer did not return until after the battle was effectively over, so her end time is moved back to the official end of the battle.

What is important here is that the join condition between the pilot/mission/craft relation and the battle table has no equalities in it. This kind of join is traditionally very expensive to compute, but as we will see, there are ways of speeding it up.

Infinite Time

One common problem with populating state tables is how to represent the open edges. For example, the begin time for the first state might not be known, or the current state may not have ended yet.

Often such values are represented by NULLs, but this complicates the intersection query because comparing with NULL yields NULL. This issue can be worked around by using coalese(end, <large timestamp>), but that adds a computation to every row, most of which don't need it. Another approach is to just use <large timestamp> directly instead of the NULL, which solves the expression computation problem but introduces an arbitrary time value. This value may give strange results when used in computations.

DuckDB provides a third alterantive from Postgres that can be used for these situations: infinite time values. Infinite time values will compare as expected, but arithmetic with them will produce NULLs or infinities, indicating that the computation is not well defined.

Common Join Algorithms

To see why these joins can be expensive, let's start by looking at the two most common join algorithms.

Hash Joins

Joins with at least one equality condition ANDed to the rest of the conditions are called equi-joins. They are usually implemented using a hash table like this:

result = []
hashes = {}
for b in build:
    hashes[b.pk] = b

for p in probe:
    result.append((p, hashes[p.fk], ))

The expressions from one side (the build side) are computed and hashed, then the corresponding expressions from the other side (the probe side) are looked up in the hash table and checked for a match.

We can modify this a bit when only some of the ANDed conditions are equalities by checking the other conditions once we find the equalities in the hash table. The important point is that we can use a hash table to make the join run time O(N). This modification is a general technique that can be used with any join algorithm which reduces the possible matches.

Nested Loop Joins

Since relational joins can be defined using any Boolean predicate – even one without equality or AND, hash joins do not always work. The join algorithm of last resort in these situations is called a Nested Loop Join (or NLJ for short), and consists of just comparing every row from the probe side with every row from the build side:

result = []
for p in probe:
    for b in build
        if compare(p, b):
            result.append((p, b, ))

This is O(M x N) in the number of rows, which can be very slow if the tables are large. Even worse, most practical analytic queries (such as the combat hours example above) will not return anything like this many results, so a lot of effort may be wasted. But without an algorithm that is tuned for a kind of predicate, this is what we would have to use.

Range Joins

When we have a range comparison (one of <, <= >, >=) as one of the join conditions, we can take advantage of the ordering it implies by sorting the input relations on some of the join conditions. Sorting is O(N log N), which suggests that this could be faster than an NLJ, and indeed this turns out to be the case.

Piecewise Merge Join

Before the advent of hash joins, databases would often sort the join inputs to find matches. For equi-joins, a repeated binary search would then find the matching values on the build side in O(M log N) time. This is called a Merge Join, and it runs faster than O(M x N), but not as fast as the O(N) time of a hash join. Still, in the case where we have a single range comparison, the binary search lets us find the first match for a probe value. We can then find all the remaining matches by looking after the first one.

If we also sort the probe side, we can even know where to start the search for the next probe value because it will be after where we found the previous value. This is how Piecewise Merge Join (PWMJ) works: We sort the build side so that the values are ordered by the predicate (either ASC or DESC), then sort each probe chunk the same way so we can quickly scan through sets of values to find possible matches. This can be significantly faster than NLJ for these types of queries. If there are more join conditions, we can then check the generated matches to make sure all conditions are met because once again the sorting has significantly reduced the number of checks that have to be made.

Inequality Join (IEJoin)

For two range conditions (like the combat pay query), there are even faster algorithms available. We have recently added a new join called IEJoin, which sorts on two predicates to really speed things up.

The way that IEJoin works is to first sort both tables on the values for the first condition and merge the two sort keys into a combined table that tracks the two input tables' row numbers. Next, it sorts the positions in the combined table on the second range condition. It can then quickly scan for matches that pass both conditions. And just like for hash joins, we can check any remaining conditions because we have hopefully significantly reduced the number pairs we have to test.

Walk Through

Because the algorithm is a bit tricky, let's step through a small example. (If you are reading the paper, this is a simplified version of the "Union Arrays" optimisation from §4.3, but I find this version of the algorithm is much easier to understand than the version in §3.1.) We are going to look at Qp from the paper, which is a self join on the table "West":

West t_id time cost cores
s1 404 100 6 4
s2 498 140 11 2
s3 676 80 10 1
s4 742 90 5 4

We are looking for pairs of billing ids where the second id had a shorter time than the first, but a higher cost:

SELECT s1.t_id, s2.t_id AS t_id2
FROM west s1, west s2
WHERE s1.time > s2.time
  AND s1.cost < s2.cost;

There are two pairs that meet this criteria:

t_id t_id2
404 676
742 676

(This is an example of another kind of double range query where we are looking for anomalies.)

First, we sort both input tables on the first condition key (time). (We sort DESC because we want the values to satisfy the join condition (>) from left to right.)

Because they are sorted the same way, we can merge the condition keys from the sorted tables into a new table called L1 after marking each row with the table it came from (using negative row numbers to indicate the right table):

L1 s2 s2 s1 s1 s4 s4 s3 s3
time 140 140 100 100 90 90 80 80
cost 11 11 6 6 5 5 10 10
rid 1 -1 2 -2 3 -3 4 -4

The rid column lets us map rows in L1 back to the original table.

Next, we build a second table L2 with the second condition key (cost) and the row positions (P) of L1 (not the row numbers from the original tables!) We sort L2 on cost (DESC again this time because now we want the join condition to hold from right to left):

L2 s2 s2 s3 s3 s1 s1 s4 s4
cost 11 11 10 10 6 6 5 5
P 0 1 6 7 2 3 4 5

The sorted column of L1 row positions is called the permutation array, and we can use it to find the corresponding position of the time value for a given cost.

At this point we have two tables (L1 and L2), each sorted on one of the join conditions and pointing back to the tables it was derived from. Moreover, the sort orders have been chosen so that the condition holds from left to right (resp. right to left). Since the conditions are transitive, this means that whenever we have a value that satisfies a condition at a point in the table, it also satisfies it for everything to the right (resp. left)!

With this setup, we can scan L2 from left to right looking for rows that match both conditions using two indexes:

  • i iterates across L2 from left to right;
  • off2 tracks i and is used to identify costs that satisfy the join condition compared to i. (Note that for loose inequalities, this could be to the right of i);

We use a bitmap B to track which rows in L1 that the L2 scan has already identified as satisfying the cost condition compared to the L2 scan position i.

Because we only want matches between one left and one right row, we can skip matches where the rids have different signs. To leverage this observation, we only process values of i that are in the left hand table (rid[P[i]] is positive), and we only mark bits for rows in the right hand table (rid[P[i]] is negative). In this example, the right side rows are the odd numbered values in P (which are conveniently also the odd values of i), which makes them easy to track in the example.

For the other rows, here is what happens:

i off2 cost[i] cost[off2] P[i] rid[P[i]] B Result
0 0 11 11 0 1 00000000 []
2 0..2 10 11..10 6 4 01000000 []
4 2..4 6 10..6 2 2 01000001 [{s4, s3}]
6 4..6 5 6..5 4 3 01010001 [{s1, s3}]

Whenever we find costs that satisfy the condition to the left of the scan location (between off2 and i), we use P[off2] to mark the bits in B corresponding to those positions in L1 that reference right side rows. This records that the cost condition is satisfied for those rows. Then whenever we have a position P[i] in L1, we can scan B to the right to find values that also satisfy the cost condition. This works because everything to the right of P[i] in L1 satisfies the price condition thanks the sort order of L1 and the transitivity of the comparison operations.

In more detail:

  1. When i and off2 are 0, the cost condition < is not satisfied, so nothing happens;
  2. When i is 1, we are looking at a row from the right side of the join, so we skip it and move on;
  3. When i is 2, we are now looking at a row from the left side, so we bring off2 forward until the cost condition fails, marking B where it succeeds at P[1] = [1];
  4. We then scan the time values in L1 right from position P[i=2] = 6 and find no matches in B;
  5. When i is 4, we bring off2 forward again, marking B at P[3] = [7];
  6. We then scan time from position 2 and find matches at [6,7], one of which (6) is from the right side table;
  7. When i is 6, we bring off2 forward again, marking B at P[5] = [3];
  8. We then scan time from position 4 and again find matches at [6,7];
  9. Finally, when i runs off the end, we have no new cost values, so nothing happens;

What makes this fast is that we only have to check a few bits to find the matches. When we do need to perform comparisons, we can use the fast radix comparison code from our sorting code, which doesn't require special templated versions for every data type. This not only reduces the code size and complexity, it "future-proofs" it against new data types.

Further Details

That walk through is a slightly simplified, single threaded version of the actual algorithm. There are a few more details that may be of interest:

  • Scanning large, mostly empty bit maps can be slow, so we use the Bloom filter optimisation from §4.2.
  • The published algorithm assumes that there are no duplicate L1 values in either table. To handle the general case, we use an exponential search to find the first L1 value that satisfies the predicate with respect to the current position and scan right from that point;
  • We also adapted the distributed Algorithm 3 from §5 by joining pairs of the sorted blocks generated by the sort code on separate threads. This allows us to fully parallelise the operator by first using parallel sorting and then by breaking up the join into independent pieces;
  • Breaking up the pieces for parallel execution also allows us to spool join blocks that are not being processed to disk, making the join scalable.

Special Joins

One of the nice things about IEJoin is that it is very general and implements a number of more specialised join types reasonably efficiently. For example, the state intersection query above is an example of an interval join where we are looking to join on the intersection of two intervals.

Another specialised join that can be accelerated with IEJoin is a band join. This can be used to join values that are "close" to each other

SELECT r.id, s.id 
FROM r, s
WHERE r.value - s.value BETWEEN a AND b;

This translates into a double inequality join condition:

SELECT r.id, s.id 
FROM r, s
WHERE s.value + a <= r.value AND r.value <= s.value + b;

which is exactly the type of join expression that IEJoin handles.

Performance

So how fast is the IEJoin? It is so fast that it is difficult to compare it to the previous range join algorithms because the improvements are so large that the other algorithms do not complete in a reasonable amount of time!

Simple Measurements

To give an example, here are the run times for a 100K self join of some employee tax and salary data, where the goal is to find the 1001 pairs of employees where one has a higher salary but the other has a higher tax rate:

SELECT
    r.id,
    s.id
FROM Employees r
JOIN Employees s
    ON r.salary < s.salary
    AND r.tax > s.tax;
Algorithm Time (s)
NLJ 21.440
PWMJ 38.698
IEJoin 0.280

Another example is a self join to find 3772 overlapping events in a 30K event table:

SELECT
    r.id,
    s.id
FROM events r
JOIN events s
    ON r.start <= s.end
    AND r.end >= s.start
    AND r.id <> s.id;
Algorithm Time (s)
NLJ 6.985
PWMJ 4.780
IEJoin 0.226

In both cases we see performance improvements of 20-100x, which is very helpful when you run a lot of queries like these!

Optimisation Measurements

A third example demonstrates the importance of the join pair filtering and exponential search optimisations. The data is a state table of library circulation data from another interval join paper, and the query is a point-in-period temporal query used to generate Figure 4d:

SELECT x, count(*) AS y
FROM books,
    (SELECT x FROM range('2013-01-01'::TIMESTAMP, '2014-01-01'::TIMESTAMP, INTERVAL 1 DAY) tbl(x)) dates
WHERE checkout <= x AND x <= return
GROUP BY ALL
ORDER BY 1;

The result is a count of the number of books checked out at midnight on each day. These are the runtimes on an 18 core iMac Pro:

Improvement Time (s) CPU
Unoptimised > 30m ~100%
Filtering 119.76s 269%
Exponential 11.21s 571%

The query joins a 35M row table with a 365 row table, so most of the data comes from the left hand side. By avoiding setting bits for the matching rows in the left table, we eliminate almost all L1 checks. This dramatically reduces the runtime and improved the CPU utilisation.

The data also has a large number of rows corresponding to books that were checked out at the start of the year, which all have the same checkout date. Searching left linearly in the first block to find the first match for the scan resulted in repeated runs of ~120K comparisons. This caused the runtime to be completely dominated by processing the first block. By reducing the number of comparisons for these rows from an average of ~60K to 16, the runtime dropped by a factor of 10 and the CPU utilisation doubled.

Conclusion and Feedback

In this blog post, we explained the new DuckDB range join improvements provided by the new IEJoin operator. This should greatly improve the response time of state table joins and anomaly detection joins. We hope this makes your DuckDB experience even better – and please let us know if you run into any problems! Feel free to reach out on our GitHub page, or our Discord server.