VOOZH about

URL: https://docs.databricks.com/aws/en/query/formats/parquet

⇱ Read and write Parquet files | Databricks on AWS


Skip to main content
Last updated on

Apache Parquet is a columnar file format optimized for analytical workloads. It allows query engines to read only the columns needed and skip irrelevant row groups. Parquet is the underlying storage format for Delta Lake(/delta/index.md), making it the most common format for data stored in Databricks. Databricks supports Parquet for both reading and writing with Apache Spark, including schema specification, partitioning, and write compression.

Prerequisites

Databricks does not require additional configuration to use Parquet files. However, to stream Parquet files, you need Auto Loader.

Options

Use the .option() and .options() methods of DataFrameReader and DataFrameWriter to configure Parquet data sources. For a complete list of supported options, see DataFrameReader Parquet options and DataFrameWriter Parquet options.

Usage

The following examples use the Wanderbricks sample dataset to demonstrate reading and writing Parquet files using the Spark DataFrame API and SQL.

Read Parquet files using SQL

Use read_files to query Parquet files directly from cloud storage using SQL without creating a table.

SQL
SELECT*FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_parquet',
format =>'parquet'
)

Read and write Parquet files

The following examples write the Wanderbricks reviews to Parquet format, read them back into a DataFrame, and demonstrate overwrite mode.

  • Python
  • Scala
  • SQL
Python
# Write wanderbricks reviews to Parquet format
df = spark.read.table("samples.wanderbricks.reviews")
df.write.format("parquet").save("/Volumes/<catalog>/<schema>/<volume>/reviews_parquet")

# Read a Parquet file into a DataFrame
df = spark.read.format("parquet").load("/Volumes/<catalog>/<schema>/<volume>/reviews_parquet")
display(df)

# Write with overwrite mode
df.write.format("parquet").mode("overwrite").save("/Volumes/<catalog>/<schema>/<volume>/reviews_parquet")
Scala
// Write wanderbricks reviews to Parquet format
val reviews = spark.read.table("samples.wanderbricks.reviews")
reviews.write.format("parquet").save("/Volumes/<catalog>/<schema>/<volume>/reviews_parquet")

// Read a Parquet file into a DataFrame
val df = spark.read.format("parquet").load("/Volumes/<catalog>/<schema>/<volume>/reviews_parquet")
df.show()

// Write with overwrite mode
df.write.format("parquet").mode("overwrite").save("/Volumes/<catalog>/<schema>/<volume>/reviews_parquet")
SQL
-- Write wanderbricks reviews to Parquet format
CREATETABLE reviews_parquet
USING PARQUET
ASSELECT*FROM samples.wanderbricks.reviews;

SELECT*FROM reviews_parquet;

Specify a schema

Specify a schema when reading Parquet files to avoid the overhead of schema inference. For example, define a schema with review_id, rating, and comment fields and read reviews_parquet into a DataFrame.

  • Python
  • Scala
  • SQL
Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
StructField("review_id", StringType(),True),
StructField("rating", IntegerType(),True),
StructField("comment", StringType(),True)
])

df = spark.read.format("parquet").schema(schema).load("/Volumes/<catalog>/<schema>/<volume>/reviews_parquet")
df.printSchema()
df.show()
Scala
importorg.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}

val schema = StructType(Array(
StructField("review_id", StringType, nullable =true),
StructField("rating", IntegerType, nullable =true),
StructField("comment", StringType, nullable =true)
))

val df = spark.read.format("parquet").schema(schema).load("/Volumes/<catalog>/<schema>/<volume>/reviews_parquet")
df.printSchema()
df.show()
SQL
-- Create a table with an explicit schema from Parquet files
CREATETABLE reviews_parquet (
review_id STRING,
rating INT,
comment STRING
)
USING PARQUET
OPTIONS (path "/Volumes/<catalog>/<schema>/<volume>/reviews_parquet");

SELECT*FROM reviews_parquet;

Write partitioned Parquet files

Write partitioned Parquet files for optimized query performance on large datasets. For example, read samples.wanderbricks.bookings and write it to bookings_parquet_partitioned partitioned by year and month derived from the check_in column.

  • Python
  • Scala
  • SQL
Python
from pyspark.sql.functions import year, month

df = spark.read.table("samples.wanderbricks.bookings")
df_with_parts = df.withColumn("year", year("check_in")).withColumn("month", month("check_in"))
df_with_parts.write.format("parquet").partitionBy("year","month").save("/Volumes/<catalog>/<schema>/<volume>/bookings_parquet_partitioned")
Scala
importorg.apache.spark.sql.functions.{year, month}

val bookings = spark.read.table("samples.wanderbricks.bookings")
val bookingsWithParts = bookings.withColumn("year", year(col("check_in"))).withColumn("month", month(col("check_in")))
bookingsWithParts.write.format("parquet").partitionBy("year","month").save("/Volumes/<catalog>/<schema>/<volume>/bookings_parquet_partitioned")
SQL
-- Write partitioned Parquet files by year and month
CREATETABLE bookings_parquet_partitioned
USING PARQUET
PARTITIONED BY(year,month)
ASSELECT*,year(check_in)ASyear,month(check_in)ASmonth
FROM samples.wanderbricks.bookings;

Additional resources

  • What is Delta Lake in Databricks?: If you need ACID transactions, schema enforcement, or time travel alongside Parquet's columnar performance, Delta Lake is the recommended format for data stored in Databricks.