Environment configuration
In this guide, we will use JupyterLab with Docker and MinIO. Think of Docker as a useful tool that simplifies running applications and MinIO as a flexible storage solution perfect for handling many different types of data. Here's how we'll set things up:
I'm not going to go into each step in depth because there is already a great tutorial for that. I suggest taking a look at it first and then coming back to continue with this one.
Once everything is ready, we will begin by preparing our sample data. Open a new Jupyter notebook to get started.
First, we need to install the s3fs
Python package, essential for working with MinIO in Python.
!pip install s3fs
After that, we will import the necessary dependencies and modules.
import os
import s3fs
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext
import pyspark.sql.functions as F
from pyspark.sql import Row
import pyspark.sql.types as T
import datetime
import time
We will also configure some environment variables that will be useful when interacting with MinIO.
# Define environment variables
os.environ("MINIO_KEY") = "minio"
os.environ("MINIO_SECRET") = "minio123"
os.environ("MINIO_ENDPOINT") = "http://minio1:9000"
Next, we will configure our Spark session with the necessary configuration.
# Create Spark session
spark = SparkSession.builder \
.appName("big_data_file_formats") \
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.1026,org.apache.spark:spark-avro_2.12:3.5.0,io.delta:delta-spark_2.12:3.0.0") \
.config("spark.hadoop.fs.s3a.endpoint", os.environ("MINIO_ENDPOINT")) \
.config("spark.hadoop.fs.s3a.access.key", os.environ("MINIO_KEY")) \
.config("spark.hadoop.fs.s3a.secret.key", os.environ("MINIO_SECRET")) \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.enableHiveSupport() \
.getOrCreate()
Let's simplify this to understand it better.
spark.jars.packages
: Download the necessary JAR files from maven repository. A Maven repository is a central place used to store build artifacts such as JAR files, libraries, and other dependencies used in Maven-based projects.spark.hadoop.fs.s3a.endpoint
: This is the URL of the MinIO endpoint.spark.hadoop.fs.s3a.access.key
andspark.hadoop.fs.s3a.secret.key
: This is the access key and secret key for MinIO. Note that this is the same as the username and password used to access the MinIO web interface.spark.hadoop.fs.s3a.path.style.access
: Set to true to enable path style access for the MinIO bucket.spark.hadoop.fs.s3a.impl
: This is the implementation class for the S3A file system.spark.sql.extensions
– Records Delta Lake configurations and SQL commands within the Spark SQL analyzer.spark.sql.catalog.spark_catalog
– Sets the Spark catalog to the Delta Lake catalog, allowing Delta Lake to handle table management and metadata operations.
Choosing the correct JAR version is crucial to avoid errors. Using the same Docker image, the JAR version mentioned here should work fine. If you have any setup issues, feel free to leave a comment. I will do my best to help you 🙂
Our next step is to create a large Spark data frame. It will have 10 million rows, divided into ten columns: half are text and the other half are numbers.
# Generate sample data
num_rows = 10000000
df = spark.range(0, num_rows)# Add columns
for i in range(1, 10): # Since we already have one column
if i % 2 == 0:
# Integer column
df = df.withColumn(f"int_col_{i}", (F.randn() * 100).cast(T.IntegerType()))
else:
# String column
df = df.withColumn(f"str_col_{i}", (F.rand() * num_rows).cast(T.IntegerType()).cast("string"))
df.count()
Let's take a look at the first entries to see what they look like.
# Show rows from sample data
df.show(10,truncate = False)+---+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|id |str_col_1|int_col_2|str_col_3|int_col_4|str_col_5|int_col_6|str_col_7|int_col_8|str_col_9|
+---+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|0 |7764018 |128 |1632029 |-15 |5858297 |114 |1025493 |-88 |7376083 |
|1 |2618524 |118 |912383 |235 |6684042 |-115 |9882176 |170 |3220749 |
|2 |6351000 |75 |3515510 |26 |2605886 |89 |3217428 |87 |4045983 |
|3 |4346827 |-70 |2627979 |-23 |9543505 |69 |2421674 |-141 |7049734 |
|4 |9458796 |-106 |6374672 |-142 |5550170 |25 |4842269 |-97 |5265771 |
|5 |9203992 |23 |4818602 |42 |530044 |28 |5560538 |-75 |2307858 |
|6 |8900698 |-130 |2735238 |-135 |1308929 |22 |3279458 |-22 |3412851 |
|7 |6876605 |-35 |6690534 |-41 |273737 |-178 |8789689 |88 |4200849 |
|8 |3274838 |-42 |1270841 |-62 |4592242 |133 |4665549 |-125 |3993964 |
|9 |4904488 |206 |2176042 |58 |1388630 |-63 |9364695 |78 |2657371 |
+---+---------+---------+---------+---------+---------+---------+---------+---------+---------+
only showing top 10 rows
To understand the structure of our data frame, we will use df.printSchema()
to see the types of data it contains. After this, we will create four CSV files. These will be used for Parquet, Avro, ORC and Delta Lake. We're doing this to avoid any bias in performance testing – using the same CSV allows Spark to cache and optimize things in the background.
# Write 4 CSVs for comparing performance for every file type
df.write.csv("s3a://mybucket/ten_million_parquet.csv")
df.write.csv("s3a://mybucket/ten_million_avro.csv")
df.write.csv("s3a://mybucket/ten_million_orc.csv")
df.write.csv("s3a://mybucket/ten_million_delta.csv")
Now, we'll create four separate data frames from these CSVs, each for a different file format.
# Read all four CSVs to create dataframes
schema = T.StructType((
T.StructField("id", T.LongType(), nullable=False),
T.StructField("str_col_1", T.StringType(), nullable=True),
T.StructField("int_col_2", T.IntegerType(), nullable=True),
T.StructField("str_col_3", T.StringType(), nullable=True),
T.StructField("int_col_4", T.IntegerType(), nullable=True),
T.StructField("str_col_5", T.StringType(), nullable=True),
T.StructField("int_col_6", T.IntegerType(), nullable=True),
T.StructField("str_col_7", T.StringType(), nullable=True),
T.StructField("int_col_8", T.IntegerType(), nullable=True),
T.StructField("str_col_9", T.StringType(), nullable=True)
))df_csv_parquet = spark.read.format("csv").option("header",True).schema(schema).load("s3a://mybucket/ten_million_parquet.csv")
df_csv_avro = spark.read.format("csv").option("header",True).schema(schema).load("s3a://mybucket/ten_million_avro.csv")
df_csv_orc = spark.read.format("csv").option("header",True).schema(schema).load("s3a://mybucket/ten_million_orc.csv")
df_csv_delta = spark.read.format("csv").option("header",True).schema(schema).load("s3a://mybucket/ten_million_delta.csv")
And that is! We are all ready to explore these big data file formats.
Work with parquet
Parquet is a column-oriented file format that pairs very well with Apache Spark, making it the best choice for handling big data. It shines in analytical scenarios, particularly when examining data column by column.
One of its interesting features is the ability to store data in a compressed format, with fast compression being the preferred option. This not only saves space but also improves performance.
Another interesting aspect of Parquet is its flexible approach to data schemas. You can start with a basic structure and then gently expand it by adding more columns as your needs grow. This adaptability makes it very easy to use for evolving data projects.
Now that we have mastered parquet, let's put it to the test. We're going to write 10 million records to a Parquet file and keep an eye on how long it takes. Instead of using the %timeit
The Python function, which is executed multiple times and can be resource-intensive for big data tasks, we will only measure it once.
# Write data as Parquet
start_time = time.time()
df_csv_parquet.write.parquet("s3a://mybucket/ten_million_parquet2.parquet")
end_time = time.time()
print(f"Time taken to write as Parquet: {end_time - start_time} seconds")
For me, this task took 15.14 seconds, but remember, this time may change depending on your computer. For example, on a less powerful PC, it took longer. So don't worry if your timing is different. The important thing here is to compare performance between different file formats.
Next, we will run an aggregation query on our Parquet data.
# Perfom aggregation query using Parquet data
start_time = time.time()
df_parquet = spark.read.parquet("s3a://mybucket/ten_million_parquet2.parquet")
df_parquet \
.select("str_col_5","str_col_7","int_col_2") \
.groupBy("str_col_5","str_col_7") \
.count() \
.orderBy("count") \
.limit(1) \
.show(truncate = False)
end_time = time.time()
print(f"Time taken for query: {end_time - start_time} seconds")+---------+---------+-----+
|str_col_5|str_col_7|count|
+---------+---------+-----+
|1 |6429997 |1 |
+---------+---------+-----+
This consultation ended in 12.33 seconds. Alright, now let's switch gears and explore the ORC file format.
Working with ORC
The ORC file format, another column-oriented competitor, may not be as well-known as Parquet, but it has its own advantages. A standout feature is its ability to compress data even more effectively than Parquet, while using the same agile compression algorithm.
It's a hit in the Hive world, thanks to its support for ACID operations on Hive tables. ORC is also tailored to handle large streaming reads efficiently.
Plus, it's as flexible as Parquet when it comes to layouts: you can start with a basic structure and then add more columns as your project grows. This makes ORC a solid choice for changing big data needs.
Let's dive deeper into testing ORC write performance.
# Write data as ORC
start_time = time.time()
df_csv_orc.write.orc("s3a://mybucket/ten_million_orc2.orc")
end_time = time.time()
print(f"Time taken to write as ORC: {end_time - start_time} seconds")
I take 12.94 seconds to complete the task. Another point of interest is the size of the data written to the MinIO bucket. In it ten_million_orc2.orc
folder, you will find several partition files, each of a constant size. Each partition ORC file has approximately 22.3MBand there are 16 files in total.
Comparing this with Parquet, each Parquet partition file has approximately 26.8MB, also totaling 16 files. This shows that ORC offers better compression than Parquet.
Next, we will test how ORC handles an aggregation query. We are using the same query for all file formats to keep our benchmarking fair.
# Perform aggregation using ORC data
df_orc = spark.read.orc("s3a://mybucket/ten_million_orc2.orc")
start_time = time.time()
df_orc \
.select("str_col_5","str_col_7","int_col_2") \
.groupBy("str_col_5","str_col_7") \
.count() \
.orderBy("count") \
.limit(1) \
.show(truncate = False)
end_time = time.time()
print(f"Time taken for query: {end_time - start_time} seconds")+---------+---------+-----+
|str_col_5|str_col_7|count|
+---------+---------+-----+
|1 |2906292 |1 |
+---------+---------+-----+
The ORC consultation ended in 13.44 seconds, a little more than Parquet's time. With ORC checked off our list, let's move on to experimenting with Avro.
Working with Avro
Avro is a row-based file format with its own unique strengths. While it doesn't compress data as efficiently as Parquet or ORC, it makes up for it with faster write speeds.
What really sets Avro apart is its excellent schema evolution capabilities. It handles changes such as added, deleted, or modified fields with ease, making it an ideal choice for scenarios where data structures evolve over time.
Avro is particularly suitable for workloads that involve a large amount of data writing.
Now, let's see how Avro does when writing data.
# Write data as Avro
start_time = time.time()
df_csv_avro.write.format("avro").save("s3a://mybucket/ten_million_avro2.avro")
end_time = time.time()
print(f"Time taken to write as Avro: {end_time - start_time} seconds")
I take 12.81 seconds, which is actually faster than Parquet and ORC. Next we'll look at Avro's performance with an aggregation query.
# Perform aggregation using Avro data
df_avro = spark.read.format("avro").load("s3a://mybucket/ten_million_avro2.avro")
start_time = time.time()
df_avro \
.select("str_col_5","str_col_7","int_col_2") \
.groupBy("str_col_5","str_col_7") \
.count() \
.orderBy("count") \
.limit(1) \
.show(truncate = False)
end_time = time.time()
print(f"Time taken for query: {end_time - start_time} seconds")+---------+---------+-----+
|str_col_5|str_col_7|count|
+---------+---------+-----+
|1 |6429997 |1 |
+---------+---------+-----+
This query took approximately 15.42 seconds. So when it comes to queries, Parquet and ORC are ahead in terms of speed. Alright, it's time to explore our final and newest file format: Delta Lake.
Working with Lake Delta
Delta Lake is a new star in the universe of big data file formats, closely related to Parquet in terms of storage size: it's like Parquet but with some additional features.
When writing data, Delta Lake takes a little longer than Parquet, mainly due to its _delta_log
folder, which is key to its advanced capabilities. These capabilities include ACID compliance for trusted transactions, time travel for accessing historical data, and small file compaction to keep everything tidy.
While a relative newcomer to the big data scene, Delta Lake has quickly become a favorite on cloud platforms running Spark, surpassing its use on on-premises systems.
Let's move on to testing Delta Lake's performance, starting with a data writing test.
# Write data as Delta
start_time = time.time()
df_csv_delta.write.format("delta").save("s3a://mybucket/ten_million_delta2.delta")
end_time = time.time()
print(f"Time taken to write as Delta Lake: {end_time - start_time} seconds")
The write operation took 17.78 seconds, which is a little longer than the other file formats we've looked at. Something interesting to note is that in the ten_million_delta2.delta
folder, each partition file is actually a Parquet file, similar in size to what we see with Parquet. Furthermore, there is the _delta_log
file.
He _delta_log
The folder in the Delta Lake file format plays a critical role in how Delta Lake manages and maintains data integrity and version control. It is a key component that distinguishes Delta Lake from other big data file formats. Here's a simple breakdown of its function:
- Transaction log: He
_delta_log
The folder contains a transaction log that records every change made to the data in the Delta table. This log is a series of JSON files that detail additions, deletions, and modifications to the data. It acts as a complete journal of all data transactions. - ACID Compliance: This record enables ACID (Atomicity, Consistency, Isolation, Durability) compliance. Every transaction in Delta Lake, such as writing new data or modifying existing data, is atomic and consistent, ensuring data integrity and reliability.
- Time travel and audit: The transaction log allows “time travel,” meaning you can easily view and restore previous versions of data. This is extremely useful for data recovery, auditing, and understanding how data has evolved over time.
- Application and evolution of the scheme: He
_delta_log
It also keeps track of the schema (structure) of the data. It enforces the schema during data writes and allows safe evolution of the schema over time without corrupting the data. - Concurrency and merger operations: Manages simultaneous reads and writes, ensuring that multiple users can access and modify data at the same time without conflicts. This makes it ideal for complex operations such as merge, update, and delete.
In summary, the _delta_log
The folder is the brains behind Delta Lake's advanced data management features, offering robust transaction logging, version control, and reliability improvements typically not available in simpler file formats like Parquet or ORC.
Now it's time to see how Delta Lake fares with an aggregation query.
# Perform aggregation using Delta data
df_delta = spark.read.format("delta").load("s3a://mybucket/ten_million_delta2.delta")
start_time = time.time()
df_delta \
.select("str_col_5","str_col_7","int_col_2") \
.groupBy("str_col_5","str_col_7") \
.count() \
.orderBy("count") \
.limit(1) \
.show(truncate = False)
end_time = time.time()
print(f"Time taken for query: {end_time - start_time} seconds")+---------+---------+-----+
|str_col_5|str_col_7|count|
+---------+---------+-----+
|1 |2906292 |1 |
+---------+---------+-----+
This consultation ended in approximately 15.51 seconds. While this is a bit slower compared to Parquet and ORC, it's pretty close. It suggests that Delta Lake's performance in real-world scenarios is quite similar to that of Parquet.
Awesome! We have concluded all our experiments. Let's recap our findings in the next section.
When to use which file format?
We have concluded our testing, so let's put all our findings together. In data writing, Avro ranks first. That's really what you're best at in practical scenarios.
When it comes to reading and executing aggregation queries, Parquet leads the pack. However, this does not mean that ORC and Delta Lake fall short. As columnar file formats, they work admirably in most situations.
Here's a quick summary:
- Choose ORC for the best compression, especially if you use Hive and Pig for analytical tasks.
- Working with Spark? Parquet and Delta Lake are their preferred options.
- For scenarios with a lot of data writing, such as landing zone areas, Avro is the best option.
And that's a wrap for this tutorial!