Recently I was delivering a Spark course. One of the exercises asked the students to split a Spark DataFrame in two, non-overlapping, parts.
One of the students came up with a creative way to do so.
He started by adding a monotonically increasing ID column to the DataFrame. Spark has a built-in function for this,
monotonically_increasing_id — you can find how to use it in the docs.
His idea was pretty simple: once creating a new column with this increasing ID, he would select a subset of the initial DataFrame and then do an anti-join with the initial one to find the complement1.
However this wasn't working. His initial DataFrame had 10000 lines. The first sub-selection had 3000 lines. The result of the anti-join, however, didn't have 7000 items.
Was his code wrong? At the essence the code was very simple. He would first read a DataFrame
df = ( spark .read.csv('chicken.csv', header=True) .repartition(40) # simulate "big" for illustrative purposes .withColumn('increasing_id', sf.monotonically_increasing_id()) )
followed by a subselection
df_small = df.filter(sf.col('weight') < 100)
and finally the anti-join
df_large_on_increasing_id = df.join( df_small, how='leftanti', on='increasing_id' )
(Disclaimer: this code is simplified;
df_large_on_increasing_id can obviously be obtained by selecting everything with weight greater or equal to 100. Thanks for noticing!)
If we now select everything from
df_large_on_increasing_id where the weight is less than 100, we should get an empty DataFrame. Do we?
df_large_on_increasing_id.filter(sf.col('weight') < 100).show(n=6)
Clearly not! But what is happening then? When creating
df_small, I should only remain with records with a weight smaller than 100. For example
When doing an anti-join with the initial DataFrame, these records should be definitely removed: the
increasing_ids 8589934592, 8589934596, and 8589934601 are present in
df_small, so they should not be present in
df_large_on_increasing_id = df - df_small!
The issue is subtle, but if you are familiar with how Spark really works, you should have already noticed it!
Spark is lazy by default: this means that, when we call
monotonically_increasing_id, Spark is actually not doing anything besides tracking that, when we will actually need the
increasing_id column, it needs to use the
monotonically_increasing_id function to compute it.
But when do we need that column? Well, it turns out we need it pretty late in the process: only when calling
show it will start doing the computation!
So what, you might say! When the computation will be triggered, why would the result be different?
Well, the hint lies in the documentation of the
monotonically_increasing_id function (emphasis mine):
The function is non-deterministic because its result depends on partition IDs.
Interesting: the IDs depend on the partition ID! This means that the function uses the partition IDs of
df_small when doing the computation for
df_small, while it uses the partition IDs of
df when doing the computation for
How can we verify this is true? (Or
True, if you're feeling pythonic)
We can simply cache and then materialize the column before using it!
To do so we first
cache the DataFrame —
cache tells Spark: once this DataFrame has been computed, hold it in memory for as long as you can — and then do a
count tells Spark: now I want to materialize the results (and then tell me how many records they contain!)
df = ( spark .read.csv('chicken.csv', header=True) .repartition(40) # simulate "big" .withColumn('increasing_id', sf.monotonically_increasing_id()) ) # once an action will be triggered, count in this case, # Spark will hold df as it is now in memory df.cache() # this is the action that materializes df df.count()
If we now re-execute the code we had before, we get a very different result!
df_small = df.filter(sf.col('weight') < 100) df_large_on_increasing_id = df.join( df_small, how='leftanti', on='increasing_id' ) df_large_on_increasing_id.filter(sf.col('weight') < 100).show()
The DataFrame is empty, as we expected!!
This was a good fix, but does it work well in production?
The answer lies in what I wrote before: Spark will keep the DataFrame in memory only for as long as it can! If memory is, for whatever reason, tight, it will evict the DataFrame, recomputing it, and hence giving you back the wrong results!
If you have Andrew Snare as a colleague, you probably heard it before: using
cache to ensure correctness in Spark is a dangerous anti-pattern2.
The students were quick to ask if there is any way around this doom. The answer is simple: write the DataFrame back to disk after you've added the column. As wasteful as it might seem3 it's the only way to ensure correctness without changing the logic (i.e. doing the anti-join with the monotonically increasing ID).4
Interested in Data Science with Spark Training?
The course I wrote about at the start of the post is one of our finest. Read more on its dedicated page.
If you want more rambling throughout the month, follow me on Twitter: I'm gglanzani there!
- The details are a bit more intricate. An anti-join is, basically, a subtraction. If you write
df1.anti-join(df2)you are effectively doing
df1 - df2, i.e. give me all the records in
df1that do not appear in
- It can still be useful for performance, but don't count on it for correctness. ↩
- Narrator voice: it is. ↩
- This is the right place to remember everyone that there are other ways to split a DataFrame, and that the exercise didn't require using a monotonically increasing ID to be done. However it was one of the highlights of the day for the students: gather all the theoretical knowledge they absorbed until then to fix what was seemingly a bug in Spark but was actually a bug in their code. ↩