What is Data Skew?

Data Skew is a condition in which a table’s data is unevenly distributed among partitions across the nodes in the cluster. 

During Aggregation and/or Join operations, if data is unevenly distributed across partitions; one or more tasks are stuck in processing the heavily skewed partition, resulting in severely degraded performance of queries, often the entire job fails due to massive shuffle. Joins between big tables require shuffling of data and skew can lead to an extreme imbalance of work in the cluster. 

Joins and Aggregations are both shuffle operations, so the symptoms and fixes of Data Skew in both are the same.

Symptom of Data Skew

  • A query appears to be stuck, finishing all but few (usually, the last 2 or 3 tasks out of default 200) tasks.
  • A join stage seems to be taking a long time in Spark UI. This is typically one last task of the many tasks.
  • Stages before and after join seem to be operating normally.
  • Logs show “Caused by: org.apache.spark.shuffle.FetchFailedException: Too large frame:”
  • On the Spark Web UI, you see something like:  

         

  • Note: 47 h in Duration can be any other value.

Potential Fixes of Data Skew

  • Always ensure the query is selecting only the relevant columns working only on the data that you need for the join.
  • Experiment with different join orderings, especially when the joins filter out a large amount of data.
  • Partition or repartition() a dataset prior to joining for reducing data movement across the cluster, especially when the same dataset is used in multiple join operations. It’s worth experimenting with different pre-join partitioning and repartitioning. Note: All joins come at the cost of a shuffle, this experimenting isn’t free of cost but can be well worth in avoiding severely degraded performance later on. 
  • Ensure that the null values are handled correctly (using null, and not ‘EMPTY’ or empty space like ” “).  Given below are the code examples to handle the null values in the joining columns.

Code Example of Data Skew

Consider the following 3 tables, i) orders, ii) customers and iii) delivery. orders table has 300 million rows where cust_id is null and 300 million rows where delivery_id is null.

Since orders table has maximum data, it is the driving table, and should be the left-most table in any join operation. 

Table Name

Row Count

Table Size

count(distinct cust_id)

cust_id is null

cust-id is not null

           

orders

500 million

50 GB

150 million

300 million

200 million

           
     

count(distinct delivery_id)

delivery_id is null

delivery-id is not null

           
 

500 million

50 GB

250 million

300 million

150 million

           
           
     

count(distinct cust_id)

cust_id is null

cust-id is not null

           

customers

150 million

15 GB

150 million

0

150 million

           
           
     

count(distinct delivery_id)

delivery_id is null

delivery-id is not null

           

delivery

250 million

25 GB

250 million

0

250 million

Sample Query joining 3 tables that results in Data Skew

select * from orders o

left join customers c on o.cust_id = c.cust_id

left join delivery d on o.dlivery_id = d.delivery_id;

In this query, marked in red are the problem areas. The bolded red is the root-cause of the Data Skew. orders table’s cust_id and delivery_id have a lot of null values and are used as join-columns with customers table and delivery table. 

Skewed Data

Since the driving table (orders) has null values in cust_id and delivery_id and we can’t filter null records before joining due to some business requirement, we need all records from the driving table. When this query is run in a Spark, you will notice the stage performing this join progresses to 199 tasks quite fast and then gets stuck on the last task, eventually the query aborts after several hours and the job fails.

Reason for Data Skew

Spark hashes the join columns and sorts it. Then Spark tries to keep records with same hashes on the same executor, so all the null values from the orders table will go to one executor. Thus Spark gets into a continuous loop of shuffling and garbage collection with no success.

Fix the Data Skew

Split orders table into two parts. The first part will contain all rows that don’t have null values for the cust_id and the second part contains all rows with null values in cust_id. Perform the join between orders with no null values in cust_id and customers table, then perform a union all of the two parts. Repeat the process for delivery_id.

The queries for splitting the orders table are:-

Query1:

create table orders_cust_id_not_null as 

select * from orders where cust_id is not null;

Query2:

create table orders_cust_id_null as 

select * from orders where cust_id is null;

Rewrite original Query:

create table orders_cust as

(select o1.cust_id from orders_cust_id_not_null o1

left join customer c on o1.cust_id = c.cust_id

union all

select o2.cust_id from orders_cust_id_null o2);

Repeat the process on delivery table

Query 3:

create table orders_delivery_id_not_null as 

select * from orders_cust where deliver_id is not null;

Query 4:

create table orders_delivery_id_null as 

select * from orders_cust where deliver_id is null;

Rewrite Original Query

create table  orders_delivery as

(select o1.delivery_id from orders_delivery_id_not_null o1

left join delivery d on o1.delivery_id = d.delivery_id

union all

select o2.delivery_id from orders_delivery_id_null o2);

Spark Code: Consider the 3 tables in csv format, given below.

orders table

 

customers table

 

delivery table

orderd_id

cust_id

delivery_id

order_dt

order_amt

 

cust_id

cust_name

cust_city

cust_state

 

delivery_id

delivery_dt

delivered_by

received_by

1A

1

1001

1/1/18

100

 

1

Adam

San Francisco

CA

 

1001

1/10/18

Jon

YA

2A

 

1002

2/2/18

200

 

2

Bob

Palo Alto

CA

 

1002

2/10/18

Joe

MA

3A

2

1003

1/1/18

260

 

30

Charly

New York

NY

 

1003

1/10/81

Jack

HA

4A

3

 

3/3/18

300

 

4

Dave

San Antonio

TX

 

1004

3/10/18

Jared

SA

5A

   

4/4/18

150

 

5

Ed

Phoenix

AZ

 

1005

10-Apr

Jack

WA

6A

4

1004

5/5/18

400

 

6

Frank

Houston

TX

 

1006

5/10/18

Javed

SA

7A

 

1005

6/6/18

500

 

7

Gary

Madison

WI

 

1007

6/10/18

Juzar

SA

orders table does have null values in cust_id for order_id 2A, 5A and 7A and null values in delivery_id for order_id 4A and 5A. These null values are the reason why these columns are prone to data skew.

We will read these tables as csv files into DataFrames, and evaluate how to resolve the data skew.

To perform join in Spark, you can directly do so on DataFrame, or use alias of the DataFrame for easier access to the column names. Below are examples of both the techniques.

The join in above code snippet is the root cause of the data skew, as it is joining on the orders.cust_id and orders.delivery_id columns, which have a lot of null values. Now, let’s look into how to resolve

data skew issue in the multi table join where null values are present.

Resolving Data Skew

Task 1: Split orders data_frame into two data_frames, one where cust_id is not null, and the other where cust_id is null.

Task 2: Join o_c_not_null DataFrame with customers DataFrame on cust_id. Note: This join will be much faster, as only the non-null cust_ids are participating in the join.

Problem: The cust_id column appears twice in the joined data_frame, as the column name is same in both the tables. If we try to access cust_id from the result data_frame, it will give ‘AnalysisException’ on cust_id due to the ambiguity.

Sub_Task 2.1: Remove duplicate columns from the resulting data_frame. This is a good opportunity to introduce a User Defined Function, so it’s generic on any two data_frames which may have duplicate columns.

Now we have the data_frame without the duplicate columns of cust_id.

Task 3: Union the resulting data_frame with o_c_null data_frame to get the full orders_customers data_frame. 

In order to perform the union of the two data_frames, the pre-requisite is that both the data_frames must have the same schema. In this case, we need to union o_c_not_null_cust data_frame which is a joined product ( with customers data_frame ) and the o_c_null data_frame which is the sub-set of the orders data_frame. To do this, we have to customize the union, first ensuring the schemas are same in both the data_frames. This can be done with a generic User Defined Function.

Now, that we have the ord_cust data_frame, we need to perform the join with delivery table. We will repeat the same steps, using the ord_cust data_frame as the driving table.

Task 4: Split ord_cust DataFrame into two DataFrames, one with delivery_id not null and the other with deliver_id null.

Task 5: Join the o_d_not_null DataFrame with delivery DataFrame. This join will be much faster, sans all the null values in delivery_id column.

Task 6: Drop duplicate column from the resulting data_frame and union it with o_d_null data_frame re-using both the UDFs created earlier. The result of the union is the final data_frame, with all the 3 tables joined. 

In this way, you can avoid tons of shuffle between the nodes in the cluster, prevent memory pressure on the driver and executor nodes, safe-guard against the data skews when null values are present, leading to a performant Spark ETL, potentially saving $$$$

raw-code: Below is the raw code from PySpark’s ipython notebook.

orders = spark.read.csv("/Users/usuf/Documents/Customers/AmFm/orders.csv", header=True, inferSchema=True)
orders.show()
+---------+-------+-----------+--------+---------+
|orderd_id|cust_id|delivery_id|order_dt|order_amt|
+---------+-------+-----------+--------+---------+
|       1A|      1|       1001|  1/1/18|      100|
|       2A|   null|       1002|  2/2/18|      200|
|       3A|      2|       1003|  1/1/18|      260|
|       4A|      3|       null|  3/3/18|      300|
|       5A|   null|       null|  4/4/18|      150|
|       6A|      4|       1004|  5/5/18|      400|
|       7A|   null|       1005|  6/6/18|      500|
+---------+-------+-----------+--------+---------+
 customers = spark.read.csv("/Users/usuf/Documents/Customers/AmFm/customers.csv", header=True, inferSchema=True)
customers.show()
+-------+---------+-------------+----------+
|cust_id|cust_name| cust_city|cust_state|
+-------+---------+-------------+----------+
| 1| Adam|San Francisco| CA|
| 2| Bob| Palo Alto| CA|
| 30| Charly| New York| NY|
| 4| Dave| San Antonio| TX|
| 5| Ed| Phoenix| AZ|
| 6| Frank| Houston| TX|
| 7| Gary| Madison| WI|
+-------+---------+-------------+----------+

delivery = spark.read.csv("/Users/usuf/Documents/Customers/AmFm/delivery.csv", header=True, inferSchema=True)
delivery.show()
+-----------+-----------+------------+-----------+
|delivery_id|delivery_dt|delivered_by|received_by|
+-----------+-----------+------------+-----------+
| 1001| 1/10/18| Jon| YA|
| 1002| 2/10/18| Joe| MA|
| 1003| 1/10/81| Jack| HA|
| 1004| 3/10/18| Jared| SA|
| 1005| 10-Apr| Jack| WA|
| 1006| 5/10/18| Javed| SA|
| 1007| 6/10/18| Juzar| SA|
+-----------+-----------+------------+-----------+

a_ord = orders.alias('a_ord')
a_cust = customers.alias('a_cust')
a_del = delivery.alias('a_del')
ocd = a_ord.join(a_cust, a_ord.cust_id == a_cust.cust_id, how='left_outer') \
.join(a_del, a_ord.delivery_id == a_del.delivery_id, how='left_outer')
ocd.show()
+---------+-------+-----------+--------+---------+-------+---------+-------------+----------+-----------+-----------+------------+-----------+
|orderd_id|cust_id|delivery_id|order_dt|order_amt|cust_id|cust_name| cust_city|cust_state|delivery_id|delivery_dt|delivered_by|received_by|
+---------+-------+-----------+--------+---------+-------+---------+-------------+----------+-----------+-----------+------------+-----------+
| 1A| 1| 1001| 1/1/18| 100| 1| Adam|San Francisco| CA| 1001| 1/10/18| Jon| YA|
| 2A| null| 1002| 2/2/18| 200| null| null| null| null| 1002| 2/10/18| Joe| MA|
| 3A| 2| 1003| 1/1/18| 260| 2| Bob| Palo Alto| CA| 1003| 1/10/81| Jack| HA|
| 4A| 3| null| 3/3/18| 300| null| null| null| null| null| null| null| null|
| 5A| null| null| 4/4/18| 150| null| null| null| null| null| null| null| null|
| 6A| 4| 1004| 5/5/18| 400| 4| Dave| San Antonio| TX| 1004| 3/10/18| Jared| SA|
| 7A| null| 1005| 6/6/18| 500| null| null| null| null| 1005| 10-Apr| Jack| WA|
+---------+-------+-----------+--------+---------+-------+---------+-------------+----------+-----------+-----------+------------+-----------+

from pyspark.sql.functions import col
o_c_not_null = orders.filter(col("cust_id").isNotNull())
o_c_not_null.show()
+---------+-------+-----------+--------+---------+
|orderd_id|cust_id|delivery_id|order_dt|order_amt|
+---------+-------+-----------+--------+---------+
| 1A| 1| 1001| 1/1/18| 100|
| 3A| 2| 1003| 1/1/18| 260|
| 4A| 3| null| 3/3/18| 300|
| 6A| 4| 1004| 5/5/18| 400|
+---------+-------+-----------+--------+---------+

o_c_null = orders.filter(col("cust_id").isNull())
o_c_null.show()
+---------+-------+-----------+--------+---------+
|orderd_id|cust_id|delivery_id|order_dt|order_amt|
+---------+-------+-----------+--------+---------+
| 2A| null| 1002| 2/2/18| 200|
| 5A| null| null| 4/4/18| 150|
| 7A| null| 1005| 6/6/18| 500|
+---------+-------+-----------+--------+---------+

o_c_not_null_cust = o_c_not_null.join(customers, o_c_not_null['cust_id'] == customers['cust_id'], how='left_outer')
o_c_not_null_cust.show()

#select o1.cust_id from orders_cust_id_not_null o1
#left join customer c on o1.cust_id = c.cust_id
+---------+-------+-----------+--------+---------+-------+---------+-------------+----------+
|orderd_id|cust_id|delivery_id|order_dt|order_amt|cust_id|cust_name| cust_city|cust_state|
+---------+-------+-----------+--------+---------+-------+---------+-------------+----------+
| 1A| 1| 1001| 1/1/18| 100| 1| Adam|San Francisco| CA|
| 3A| 2| 1003| 1/1/18| 260| 2| Bob| Palo Alto| CA|
| 4A| 3| null| 3/3/18| 300| null| null| null| null|
| 6A| 4| 1004| 5/5/18| 400| 4| Dave| San Antonio| TX|
+---------+-------+-----------+--------+---------+-------+---------+-------------+----------+

data = [
Row(zip_code=78258, dma='TX'),
Row(zip_code=78149, dma='TX'),
Row(zip_code=53704, dma='WI'),
Row(zip_code=94538, dma='CA')
]

firstDF = spark.createDataFrame(data)

data = [
Row(zip_code='782', name='TX'),
Row(zip_code='781', name='TX'),
Row(zip_code='537', name='WI'),
Row(zip_code='945', name='CA')
]

secondDF = spark.createDataFrame(data)

customUnion(firstDF,secondDF).show()
+----+----+--------+
| dma|name|zip_code|
+----+----+--------+
| TX|null| 78258|
| TX|null| 78149|
| WI|null| 53704|
| CA|null| 94538|
|null| TX| 782|
|null| TX| 781|
|null| WI| 537|
|null| CA| 945|
+----+----+--------+

from pyspark.sql.functions import lit
from pyspark.sql import Row

def customUnion(df1, df2):
cols1 = df1.columns
cols2 = df2.columns
#total_cols = sorted(cols1 + list(set(cols2) - set(cols1))) #if columns need to be in sorted order
total_cols = cols1 + list(set(cols2) - set(cols1))
def expr(mycols, allcols):
def processCols(colname):
if colname in mycols:
return colname
else:
return lit(None).alias(colname)
cols = map(processCols, allcols)
return list(cols)
appended = df1.select(expr(cols1, total_cols)).union(df2.select(expr(cols2, total_cols)))
return appended
def remove_duplicate_cols(df, df1, df2):
repeated_columns = [c for c in df1.columns if c in df2.columns]
for col in repeated_columns:
df = df.drop(df2[col])
return df
o_c_not_null_cust = remove_duplicate_cols(o_c_not_null_cust, o_c_not_null, customers)
o_c_not_null_cust.show()
+---------+-------+-----------+--------+---------+---------+-------------+----------+
|orderd_id|cust_id|delivery_id|order_dt|order_amt|cust_name| cust_city|cust_state|
+---------+-------+-----------+--------+---------+---------+-------------+----------+
| 1A| 1| 1001| 1/1/18| 100| Adam|San Francisco| CA|
| 3A| 2| 1003| 1/1/18| 260| Bob| Palo Alto| CA|
| 4A| 3| null| 3/3/18| 300| null| null| null|
| 6A| 4| 1004| 5/5/18| 400| Dave| San Antonio| TX|
+---------+-------+-----------+--------+---------+---------+-------------+----------+

ord_cust = customUnion(o_c_not_null_cust, o_c_null).sort(['orderd_id'])
ord_cust.show()
+---------+-------+-----------+--------+---------+---------+-------------+----------+
|orderd_id|cust_id|delivery_id|order_dt|order_amt|cust_name| cust_city|cust_state|
+---------+-------+-----------+--------+---------+---------+-------------+----------+
| 1A| 1| 1001| 1/1/18| 100| Adam|San Francisco| CA|
| 2A| null| 1002| 2/2/18| 200| null| null| null|
| 3A| 2| 1003| 1/1/18| 260| Bob| Palo Alto| CA|
| 4A| 3| null| 3/3/18| 300| null| null| null|
| 5A| null| null| 4/4/18| 150| null| null| null|
| 6A| 4| 1004| 5/5/18| 400| Dave| San Antonio| TX|
| 7A| null| 1005| 6/6/18| 500| null| null| null|
+---------+-------+-----------+--------+---------+---------+-------------+----------+

o_d_not_null = ord_cust.filter(col("delivery_id").isNotNull())
o_d_not_null.show()
+---------+-------+-----------+--------+---------+---------+-------------+----------+
|orderd_id|cust_id|delivery_id|order_dt|order_amt|cust_name| cust_city|cust_state|
+---------+-------+-----------+--------+---------+---------+-------------+----------+
| 1A| 1| 1001| 1/1/18| 100| Adam|San Francisco| CA|
| 2A| null| 1002| 2/2/18| 200| null| null| null|
| 3A| 2| 1003| 1/1/18| 260| Bob| Palo Alto| CA|
| 6A| 4| 1004| 5/5/18| 400| Dave| San Antonio| TX|
| 7A| null| 1005| 6/6/18| 500| null| null| null|
+---------+-------+-----------+--------+---------+---------+-------------+----------+

o_d_null = ord_cust.filter(col("delivery_id").isNull())
o_d_null.show()
+---------+-------+-----------+--------+---------+---------+---------+----------+
|orderd_id|cust_id|delivery_id|order_dt|order_amt|cust_name|cust_city|cust_state|
+---------+-------+-----------+--------+---------+---------+---------+----------+
| 4A| 3| null| 3/3/18| 300| null| null| null|
| 5A| null| null| 4/4/18| 150| null| null| null|
+---------+-------+-----------+--------+---------+---------+---------+----------+

o_d_not_null_del = o_d_not_null.join(delivery, o_d_not_null['delivery_id'] == delivery['delivery_id'], how='left_outer')
o_d_not_null_del.show()
+---------+-------+-----------+--------+---------+---------+-------------+----------+-----------+-----------+------------+-----------+
|orderd_id|cust_id|delivery_id|order_dt|order_amt|cust_name| cust_city|cust_state|delivery_id|delivery_dt|delivered_by|received_by|
+---------+-------+-----------+--------+---------+---------+-------------+----------+-----------+-----------+------------+-----------+
| 1A| 1| 1001| 1/1/18| 100| Adam|San Francisco| CA| 1001| 1/10/18| Jon| YA|
| 2A| null| 1002| 2/2/18| 200| null| null| null| 1002| 2/10/18| Joe| MA|
| 3A| 2| 1003| 1/1/18| 260| Bob| Palo Alto| CA| 1003| 1/10/81| Jack| HA|
| 6A| 4| 1004| 5/5/18| 400| Dave| San Antonio| TX| 1004| 3/10/18| Jared| SA|
| 7A| null| 1005| 6/6/18| 500| null| null| null| 1005| 10-Apr| Jack| WA|
+---------+-------+-----------+--------+---------+---------+-------------+----------+-----------+-----------+------------+-----------+

o_d_not_null_del = remove_duplicate_cols(o_d_not_null_del, o_d_not_null, delivery)
o_d_not_null_del.show()
+---------+-------+-----------+--------+---------+---------+-------------+----------+-----------+------------+-----------+
|orderd_id|cust_id|delivery_id|order_dt|order_amt|cust_name| cust_city|cust_state|delivery_dt|delivered_by|received_by|
+---------+-------+-----------+--------+---------+---------+-------------+----------+-----------+------------+-----------+
| 1A| 1| 1001| 1/1/18| 100| Adam|San Francisco| CA| 1/10/18| Jon| YA|
| 2A| null| 1002| 2/2/18| 200| null| null| null| 2/10/18| Joe| MA|
| 3A| 2| 1003| 1/1/18| 260| Bob| Palo Alto| CA| 1/10/81| Jack| HA|
| 6A| 4| 1004| 5/5/18| 400| Dave| San Antonio| TX| 3/10/18| Jared| SA|
| 7A| null| 1005| 6/6/18| 500| null| null| null| 10-Apr| Jack| WA|
+---------+-------+-----------+--------+---------+---------+-------------+----------+-----------+------------+-----------+

ord_cust_del = customUnion(o_d_not_null_del, o_d_null).sort(['orderd_id'])
ord_cust_del.show()
+---------+-------+-----------+--------+---------+---------+-------------+----------+-----------+------------+-----------+
|orderd_id|cust_id|delivery_id|order_dt|order_amt|cust_name| cust_city|cust_state|delivery_dt|delivered_by|received_by|
+---------+-------+-----------+--------+---------+---------+-------------+----------+-----------+------------+-----------+
| 1A| 1| 1001| 1/1/18| 100| Adam|San Francisco| CA| 1/10/18| Jon| YA|
| 2A| null| 1002| 2/2/18| 200| null| null| null| 2/10/18| Joe| MA|
| 3A| 2| 1003| 1/1/18| 260| Bob| Palo Alto| CA| 1/10/81| Jack| HA|
| 4A| 3| null| 3/3/18| 300| null| null| null| null| null| null|
| 5A| null| null| 4/4/18| 150| null| null| null| null| null| null|
| 6A| 4| 1004| 5/5/18| 400| Dave| San Antonio| TX| 3/10/18| Jared| SA|
| 7A| null| 1005| 6/6/18| 500| null| null| null| 10-Apr| Jack| WA|
+---------+-------+-----------+--------+---------+---------+-------------+----------+