PySpark: Repartition() vs Coalesce() functions

Pyspark provides the repartition() and coalesce() functions to manage the distribution of Resilient Distributed DataFrames (RDDs) and DataFrame partitions.

These two functions serve different purposes. repartition() is used to either increase or decrease the number of partitions, while coalesce() is specifically used to decrease the number of partitions for RDDs and DataFrames.

Beyond their primary purpose, the way these functions redistribute partitions also differs:

Repartition(): This function completely reshuffles the data in all partitions. In other words, it creates a new partitioning scheme from scratch. Due to this behavior, repartition() can be quite costly to use, especially on large datasets, as it involves moving data around extensively.

Coalesce(): Unlike repartition(), coalesce() optimizes the distribution of data by moving it only from removed partitions to those with less data. It does not perform a full reshuffling of all data, making it a more cost-effective operation in many cases. However, it's essential to note that the effectiveness of coalesce() depends on the number of partitions the DataFrame is reduced to. It is typically used after heavy filtering of a DataFrame to optimize subsequent operations.

In summary, Pyspark's repartition() and coalesce() functions serve different purposes when it comes to managing partitions in RDDs and DataFrames, and their performance characteristics should be considered when choosing between them.

Let us now see examples of how to use these functions.

Using repartition():

Let us create a dataframe:

















Check how many partitions the data is divided into (based on your cluster size, the data will be distributed).









Check how many records are in each partition and identify which record is located in which partition.






































Verify the data after it has been repartitioned into 6 partitions.
































Based on the output, It is evident that the data in the partitions has been reshuffled.

Let's examine how this partitioned data is stored in files if we write it to the storage location.































Since we redistributed the data into 6 partitions, the DataFrame writer has generated 6 separate part files, each containing data for a specific partition.

Using coalesce():

Lets use the same dataframe for this demonstration as well.
















Check how many partitions the data is divided into (based on your cluster size, the data will be distributed).











Check how many records are in each partition and identify which record is located in which partition.





































Let's examine the data and its partitioning after repartitioning using the coalesce function.
































Based on the output, It is evident that the data in partitions 0 to 4 has not been reshuffled, and the data from the removed partition has been appended to the last partition.

Let's observe how this partitioned data is stored in files when we write it to the storage location.






























Since we redistributed into 6 partitions, the DataFrame writer has generated 6 part files, each containing data for its respective partition.

Conclusion:

In this article, we have learned about Repartition and Coalesce methods available in Pyspark to decrease or increase default dataframe partitions and we also see the difference in between them.


Please find below the complete code used in this article.
 
##Using Repartition

data = [(1,"English"),(2,"Hindi"),(3,"Urdu"),(4,"Tamil"),(5,"Kannada"),(6,"Telugu"),(7,"Behari"),(8,"Marati"),(9,"Gujarati"),(10,"Panjabi")]
schema =['Id',"Language"]
df =spark.createDataFrame(data,schema)
display(df)

df.rdd.getNumPartitions()

from pyspark.sql.functions import *
#Adding Partition ID of each row
df = df.withColumn("Initial Partition ID",spark_partition_id())
#Displaying number of rows each partition has
df.groupBy("Initial Partition ID").count().sort("Initial Partition Id").show()
df.show()

df = df.repartition(6)
df = df.withColumn("Repartioned Partition ID",spark_partition_id())
df.groupBy("Repartioned Partition ID").count().sort("Repartioned Partition Id").show()
df.show()

data = [(1,"English"),(2,"Hindi"),(3,"Urdu"),(4,"Tamil"),(5,"Kannada"),(6,"Telugu"),(7,"Behari"),(8,"Marati"),(9,"Gujarati"),(10,"Panjabi")]
schema =['Id',"Language"]
df =spark.createDataFrame(data,schema)
df=df.repartition(6)
df.write.mode("overwrite").csv("dbfs:/FileStore/202309/Repartition/",header = True,sep = '\t')

#using Coalesce()

data = [(1,"English"),(2,"Hindi"),(3,"Urdu"),(4,"Tamil"),(5,"Kannada"),(6,"Telugu"),(7,"Behari"),(8,"Marati"),(9,"Gujarati"),(10,"Panjabi")]
schema =['Id',"Language"]
df =spark.createDataFrame(data,schema)
display(df)

df.rdd.getNumPartitions()

from pyspark.sql.functions import *
#Adding Partition ID of each row
df = df.withColumn("Initial Partition ID",spark_partition_id())
#Displaying number of rows each partition has
df.groupBy("Initial Partition ID").count().sort("Initial Partition Id").show()
df.show()

df = df.coalesce(6)
df = df.withColumn("Repartioned Partition ID",spark_partition_id())
df.groupBy("Repartioned Partition ID").count().sort("Repartioned Partition Id").show()
df.show()


data = [(1,"English"),(2,"Hindi"),(3,"Urdu"),(4,"Tamil"),(5,"Kannada"),(6,"Telugu"),(7,"Behari"),(8,"Marati"),(9,"Gujarati"),(10,"Panjabi")]
schema =['Id',"Language"]
df =spark.createDataFrame(data,schema)
df=df.coalesce(6)
df.write.mode("overwrite").csv("dbfs:/FileStore/202309/Coalesce/",header = True,sep = '\t')


No comments:

Post a Comment