PySpark: Read and Write Parquet files to DataFrame

In this blog post, we will delve into the world of Parquet files, exploring their characteristics and learning how to read and write them. Before we dive into the examples, let's gain an understanding of Parquet files:
  • Designed for Efficiency: The Parquet file format is specifically crafted to accelerate the processing of large and complex datasets.
  • Faster Data Processing: Parquet's exceptional processing speed is attributed to its storage of data in a columnar format and its built-in data compression capabilities.
  • Schema Preservation: Unlike some other data sources like CSV, Parquet files do not require you to explicitly specify options such as the header, infer schema, or separator. Parquet files inherently preserve schema information.
  • Automatic Nullable Columns: When you read Parquet files, all columns are automatically converted into a nullable format to ensure compatibility.
  • Data Optimization: Parquet files are designed to skip irrelevant data while reading, which significantly enhances data retrieval and aggregation performance
PySpark offers two primary methods for reading Parquet files, namely spark.read.parquet() and spark.read.format("parquet").load(), both of which belong to the DataFrameReader class. Similarly, for writing DataFrame data to Parquet files, PySpark provides two methods: DataFrame.write.parquet() and DataFrame.write.format("parquet").save(), which are part of the DataFrameWriter class.

Let us see this with examples.

Reading Parquet files:

Using parquet() Method:











Using format().load() Method:











Similar to Avro and Thrift, the Parquet format also supports schema evolution. This means that during the initial stages of development, developers may create Parquet files with a limited set of columns. As development progresses, more columns may be added, and data is continuously written to these Parquet files. Consequently, the schema between the initial and later files may differ, requiring consideration of data from all Parquet files.

To address this schema evolution, the Parquet read method provides an option or property called 'mergeSchema.' By default, this property is set to 'false,' meaning that Spark will use the schema information from a sample file and only read that schema from all the files. To read and merge all available schemas, including adding null values where columns are not available in some files, you need to set the 'mergeSchema' property to 'true.'

It's important to note that enabling this property can be computationally expensive when using PySpark.
















Writing DataFrame Data to Parquet files:

Using parquet() Method:
























Using format().save() Method:
























Mode Property:

When attempting to load data into a location that already contains files, Spark will raise the following error due to the default mode property being set to 'Error.' For a more practical illustration, please refer to the snapshot below.














The DataFrameWriter's Write methods support the 'Mode' property, which specifies how the cluster should handle existing files, folders, or directories. Here are the typical options supported by the 'Mode' property:

  • Append: This option adds new data to existing files.
  • Overwrite: When set to 'Overwrite,' it replaces the existing data with the new data.
  • Ignore: With 'Ignore' mode, the Spark cluster will not write the DataFrame data to the location if it already contains files. It simply skips this step without any error.
  • Error (Default): The default setting for the 'Mode' method is 'Error.' This means it will throw a 'file already exists' error if the location is not empty.
Here I will show the append option rest left to you to practice.
























Conclusion:

In this article, we've explored the process of reading Parquet files and writing DataFrame data to Parquet files. Additionally, we've covered some commonly used options in these operations.

Please check below the complete code used in this article.
  
df = spark.read.parquet("dbfs:/FileStore/ParquetDataSet/part-00000-tid-169821129090029059-e491a0b5-6229-42c2-8555-5e7f549d2220-3-1-c000.snappy.parquet")
display(df)

df = spark.read.format("parquet").load("dbfs:/FileStore/ParquetDataSet/part-00000-tid-169821129090029059-e491a0b5-6229-42c2-8555-5e7f549d2220-3-1-c000.snappy.parquet")
display(df)

df = spark.read.format("parquet").option("mergeSchema","True").load("dbfs:/FileStore/ParquetDataSet/part-00000-tid-169821129090029059-e491a0b5-6229-42c2-8555-5e7f549d2220-3-1-c000.snappy.parquet")
display(df)

df.write.parquet("dbfs:/FileStore/ParquestDataWrite/")

df.write.format("parquet").save("dbfs:/FileStore/ParquetDataWritebyFormat/")

df.write.mode("append").format("parquet").save("dbfs:/FileStore/ParquetDataWritebyFormat/")






No comments:

Post a Comment