Know your joins in Apache Spark
Updated: Jul 5, 2020
One of the most fundamental operations when working with Spark is the join. Whether you use the older Resilient Distributed Dataset (RDD) API or the newer DataFrame API, you'll very often need to combine multiple datasets, or modify one based on the contents of another.
Spark offers a variety of different join types, and in this article we'll take a look at each of them. Afterwards, we'll look at the different strategies used for actually performing these joins.
Join Types The differences in how the join types behave fall along two axes: which rows get included, and which columns get included.
First, lets look at the columns. This is quite simple: there are full joins, left joins, and right joins. The result of a full join will contain all columns from the left dataset, as well as all columns from the right dataset. Left and right joins will result in only the columns from one of the datasets being present in the result.
Next, the question of which rows get returned. For almost all types of join, we specify one or more join keys. These are expressions on two columns which are either true or false.
Typically, this will be an equality check between the two values, but it doesn't have to be. For example, one could specify a join key which matches when a value in the column from the left dataset is strictly greater than a corresponding value in the right dataset. Theoretically, any type of expression will work, so long as it returns a boolean value. The difference between join types comes in how that return value is used, and what happens when the expression cannot be evaluated. For equality joins, this will be the case if one of the rows being evaluated has missing data for the target column.
One way is the Inner Join. Here, a row is present in the result only if the rows from both dataset match the join expression.
The Full Join is quite different. It will include all rows in which exist in either one of the two datasets. If there are missing values, they will also be missing in the result. This type of join is also called an Outer Join or Full Outer Join. Outer Joins can also be one-sided, as discussed previously.
In the image to the right, we see a Left Outer Join. Here, all rows from the left dataset are included, as well as all rows from the right dataset which match rows in the left dataset. Rows in the right dataset which do not match any rows in the left dataset are discarded.
A variation on this type of join is the Semi Join. A Left Semi Join will return the same rows as a Left Outer Join, but will include only the columns present in the left dataset, whereas the Left Outer Join will include all columns from both datasets. Spark does not explicitly support Right SemiJoins, but the same effect can be achieved by using a Left Semi Join and swapping the datasets around.
A further type of join is the Anti Join. Anti Joins are always one- sided, and again we'll use a Left Anti Join as the example. With this type of join, all the rows from the left dataset are included, except those which have matching values in the right dataset. As with Semi Joins, Spark does not offer a Right Anti Join, but you can just swap the datasets.
There is one more join type: the Cross Join. This returns a Cartesian product of the two datasets, which means that it will contain each possible pair of rows from the two datasets. The size of the result is the size of the left dataset, multiplied by the size of the right dataset. Cross joins can get extremely large very quickly unless one or both datasets are tiny. No join keys are required to perform a Cross Join.
Join Strategies All the different join types listed above specify the logic of a join. If we were working on a simple database system, that would be the end of it. However, due to Spark's distributed nature, there are several different ways that it can actually perform these joins, each having their own pros and cons.
The simplest (and also least efficient) strategy is Broadcast Nested Loop. The idea is very simple: compare all the pairs of records in the two datasets, and evaluates the join expression(s) for each of them. This strategy requires that one of the datasets is broadcast (more on broadcasting later), which can result in nodes running out of memory if the datasets are too large. The advantage of this strategy is that it can work on any type of join, and any join expression. Unfortunately, it is unsafe (because of the risk of running out of memory) and very inefficient, and therefore Spark only uses it as a last resort if no better options are available.
A slight improvement over the Broadcast Nested Loop is the Cartesian Join, which is really just an optimized version. The main benefit of using the Cartesian Join instead of a Broadcast Nested Loop Join is that the Cartesian does not have the issues with running out of memory. However, the Cartesian Join strategy only works for Inner Joins.
If we are joining based on the equality of columns rather than any more exotic join condition, a few more efficient strategies may be available. One of these is the Sort-Merge Join strategy. It works by sorting the data on the join keys before partitioning the data across the workers. This vastly improves the chances that the data which should be joined together will already be in the same worker node, which reduces the need for further data transfer. The Sort-Merge strategy is the most commonly used one in Spark, especially for large datasets, but it does come with a few restrictions: It can only be used if the join keys are sortable and the join expression itself is a simple equality check. Unlike the Cartesian Join, Sort-Merge is supported for all join types.
If at least one of the datasets we are joining is very small (10MB by default), then we can choose to use the Broadcast Hash Join strategy. Broadcasting means that the small dataset will be replicated in full to all the worker nodes, rather than being partitioned. This guarantees that each worker will be able to perform its share of the work for the join without having to fetch additional data. The “Hash” part of the name refers to the fact that this strategy will compute a hash value for every join key in the smaller of the two datasets. That hash is then used during the joining process, which speeds up the required lookups by using a hash table. Like Sort-Merge, the Broadcast Hash strategy only works on equality joins. However, it does not require the join keys to be sortable. Broadcast Hash is generally the fastest strategy, but can only be used if one or both datasets are small enough. If the join keys are not sortable and the datasets are both large, we cannot use either Sort-Merge or Broadcast Hash. Instead, Spark will typically select the Shuffle Hash Join strategy as an alternative. Conceptually, this is very similar to the Broadcast Hash join. The difference is that both datasets are partitioned and spread across the workers, which means that further data transfer (“shuffle”) may be required after the initial partitioning, which reduces performance. The Shuffle Hash strategy, like the previous two, only works for equality joins.
The join strategies can be summed up as follows, from most to least efficient (generally speaking):
Broadcast Hash Join (Equality only, at least one small dataset)
Sort-Merge Join (Equality only, sortable keys)
Shuffle Hash Join (Equality only)
Cartesian Join (Inner Join only)
Broadcast Nested Loop Join (Always available, never desirable)