Organizations are constantly seeking powerful solutions to unlock the highest potential of their data assets. One such solution is Delta Lake. With its unique combination of reliability, scalability, and performance, Delta Lake has revolutionized the way data lakes are managed and utilized. In this article, we will go into the depths of Delta Lake’s best practices, exploring the strategies and techniques that can boost your data management to new heights.
The true power of Delta Lake does not lie in its technology but also in the best practices that are very important to implement. To truly use the abilities of Delta Lake for your data and to push your organization toward unstoppable success, it is essential to master the best practices.
Table of Contents
Delta Lake: Mastering the Best Practices
Delta Lake is an open-source storage layer that brings reliability and performance to data lakes. It enables you to build scalable and reliable data pipelines using your favorite tools and frameworks.
Delta Lake allows you to easily use a single copy of data for both batch and streaming operations and provides incremental processing at scale.
In Delta Lake, the stored data file is organized into three layers, each containing different types of data:
- Bronze Layer: The bronze layer represents the raw, unprocessed data ingested into the data lake. It includes data in its original form, often sourced from various systems, applications, or external sources. The bronze layer typically retains the data in its original format without any modifications. It serves as the foundation for subsequent processing and analysis.
- Silver Layer: The silver layer sits on top of the bronze layer and involves data refinement and preparation. In this layer, data undergoes transformations, cleansing, validation, and integration to ensure consistency, quality, and reliability. This layer may involve tasks such as removing duplicates, handling missing values, standardizing formats, and resolving inconsistencies. The silver layer is where data starts to become more structured and reliable, making it suitable for analysis and reporting.
- Gold Layer: The gold layer represents the highest level of data refinement and is optimized for analytics and consumption. In this layer, data is further enriched, aggregated, and organized to support specific business use cases. It involves activities such as data modeling, creating derived metrics, building data cubes or aggregates, and applying business rules or calculations. The gold layer provides a curated and well-structured view of the data, ready for advanced analytics, machine learning, and reporting purposes.
Here I am going to share some best practices for using Delta Lake effectively and efficiently.
Know more about Delta Lake: Why Databricks Delta Live tables are the next big thing?
Best Practice 1: Improve the Query Performance by Choosing the Right Partition Column
Partitioning a Delta table means dividing it into smaller sections based on the values in a specific column. This helps improve how quickly queries are processed and reduces the size of the files involved. The most common column used for partitioning is ‘date‘, but you can also choose other columns that have a few distinct values and are frequently used in filtering queries, such as ‘country‘ or ‘region‘ in a sales transaction table.
It’s essential to avoid partitioning by columns that either have a very large number of unique values or are not selective enough. For example, partitioning by ‘userId’ or ‘transactionId’ could result in the creation of millions of small files, which would slow down performance. Similarly, partitioning by ‘
productCategory‘ or ‘
paymentMethod‘ might create too many partitions that aren’t useful for filtering.
To make a good decision on which column to choose for partitioning, a useful guideline is to select a column that can generate partitions with a minimum size of 1 GB of data each.
Consider the below example where I have partitioned the table with the country column.
df = spark.createDataFrame( [ ("Rajaniesh", "Kaushikk", "India"), ("Rajesh", "Chandra", "Russia"), ("Bruce", "ekle", "USA"), ("Jackiee", "Chan", "China"), ] ).toDF("first_name", "last_name", "country") df.repartition(F.col("country")).write.partitionBy("country").format( "delta" ).saveAsTable("country_people")
Best Practice 2: Provide Data Location Hints
If you have a column that is frequently used for filtering queries and it has a large number of different values, you can improve query performance by using a feature called Z-ORDER BY in Delta Lake. When you apply Z-ORDER BY to a column, Delta Lake arranges the data in the files based on the values in that column. This arrangement helps skip unnecessary data during query execution, making the queries faster.
For example, let’s consider a table of customer reviews with columns like
rating‘, and ‘
review_date‘. If you expect to frequently filter and search for reviews based on the ‘
product_id‘ column, and ‘
product_id‘ has high cardinality (many distinct values), you can use
Z-ORDER BY on the ‘
product_id‘ column. This will organize the data in the table files based on the ‘
product_id‘ values, making it easier and faster to retrieve reviews for specific products.
Best Practice 3: Compact your files to Improve the performance of the File System
When you continuously add data to a Delta table, especially in small batches, it can lead to a buildup of a large number of files over time. This can negatively impact the efficiency of reading from the table and even affect the performance of your file system.
To address this, it’s best to regularly rewrite these many small files into a smaller number of larger files. This process is called compaction. You can use the ‘
OPTIMIZE‘ command to perform compaction on a table. It automatically reorganizes the way data is stored based on the values in the columns and utilizes a technique called
Z-order indexing, which helps skip irrelevant data during queries, making them faster.
For example, you have a Delta table called “
Sales_data” that contained sales transactions. Over time, as new transactions are added in small batches, the table accumulates a large number of small files. Use the optimize command to perform the compaction of the table:
OPTIMIZE sales_data WHERE date <= current_timestamp() ZORDER BY (sales_type)
When you run this command, Delta Lake will automatically reorganize the data in the table. It will rewrite the many small files into a smaller number of larger files. During this process, it takes into account the values in the columns, such as ‘date‘ or ‘region‘, and optimizes the data layout accordingly.
It’s important to note that compaction doesn’t automatically remove the old files. To delete these unused files and free up storage space, you need to run the ‘VACUUM’ command. This command deletes files that are no longer referenced or needed by the table, improving the overall efficiency of your Delta table.
Best Practice 4: Replace the Content or Schema of a Table
Sometimes, you might need to completely replace a Delta table for various reasons, such as:
- If you find that the data in the table is incorrect and needs to be replaced.
- When you want to make significant changes to the table’s structure, like altering column types.
While you can delete the entire directory of a Delta table and create a new table at the same location, it is not recommended due to several reasons:
- Deleting a large directory can be time-consuming, taking hours or even days.
- Deleting the directory means losing all the data in the deleted files, making it difficult to recover if you mistakenly delete the wrong table.
- The deletion process is not atomic, meaning that concurrent queries accessing the table might fail or observe an incomplete table while the deletion is in progress.
A better approach to replace a table is by using the ‘
overwrite‘ mode with the ‘
overwriteSchema‘ option when writing to a Delta table. This method ensures atomicity, meaning the replacement happens all at once. It replaces the entire table, including both the data and the schema, with the new data you provide. This way, you avoid the drawbacks of deleting the directory and achieve a seamless replacement of the table.
Consider the below example:
dataframe.write \ .format("delta") \ .mode("overwrite") \ .option("overwriteSchema", "true") \ .saveAsTable("<your-table>") # Managed table dataframe.write \ .format("delta") \ .mode("overwrite") \ .option("overwriteSchema", "true") \ .option("path", "<your-table-path>") \ .saveAsTable("<your-table>") # External table
In the code examples, a DataFrame is written to a Delta table. The
'format("delta")’ specifies the Delta format. Using ‘mode(“
overwrite“)’ with ‘option(
"overwriteSchema", "true“)’ allows the entire table, including schema, to be replaced. ‘
saveAsTable‘ creates a managed table or an external table depending on whether a path is specified.
Best Practice 5: Think before using the Spark Caching
Spark caching allows you to store intermediate data or query results in memory or on disk. By caching data, Spark can avoid re-computing the same data or results repeatedly, which can significantly improve the performance of subsequent queries or operations.
Spark caching can speed up queries on Delta tables by storing intermediate results in memory or disk, but cannot be used to store results of arbitrary subqueries. So if you have complex queries involving subqueries, Spark caching may not be effective in speeding up the execution of those subqueries. Databricks does not recommend you to use Spack caching because:
- Caching consumes memory or disk resources that could be used for other tasks, impacting overall performance.
- Caching does not reflect changes made by other writers to the underlying data, potentially leading to inconsistencies.
- Different queries using different cache levels or storage formats may produce inconsistent results.
To use caching effectively, follow these guidelines:
- Cache only small and frequently accessed tables or partitions.
- Cache tables or partitions that are not frequently updated by other writers.
- Use the same cache level and storage format for all queries on the same table or partition.
- Refresh or invalidate the cache when the underlying data changes to ensure accuracy.
Best Practice 6: Configure Delta Lake to Control the File Sizes
To better understand tuning file sizes, we need to consider how Delta tables store data. Delta tables organize data into files, and the size of these files can impact query performance and storage efficiency. By tuning file sizes, you can optimize the balance between these factors.
For larger files, queries can read data more efficiently, resulting in better performance. However, having very large files can also lead to issues when updating or deleting specific records within the file.
On the other hand, smaller files can provide more flexibility for updates and deletions. However, handling many small files can impact query performance and increase storage overhead.
To strike a balance, you can adjust the file sizes by using the techniques described below:
|Option||When to Use What||Example|
|OPTIMIZE Operation||Recommended for tables larger than 1 TB. Consolidates files and enables enhanced data skipping with Z-order indexes.||OPTIMIZE table_name|
|Auto Compaction||Reduces small file problems by combining small files within Delta table partitions. Control the output file size using spark.databricks.delta.autoCompact.maxFileSize configuration.||spark.conf.set("spark.databricks.delta.autoCompact.maxFileSize", "128m")|
|Optimized Writes||Enabled by default for operations like MERGE, UPDATE with subqueries, and DELETE with subqueries. Improves file size during data writes and benefits subsequent reads.||delta.autoOptimize.optimizeWrite = true|
|Setting Target File Size||Manually set the desired size of files in a Delta table using the delta.targetFileSize table property. Affects operations like OPTIMIZE, Z-ordering, auto compaction, and optimized writes.||ALTER TABLE table_name SET TBLPROPERTIES ('delta.targetFileSize'='100mb')|
|Autotune File Size Based on Workload||Recommended for tables targeted by many MERGE or DML operations. Accelerates write-intensive operations.||ALTER TABLE table_name SET TBLPROPERTIES ('delta.tuneFileSizesForRewrites'='true')|
|Autotune File Size Based on Table Size||Automatically tunes the file size of Delta tables based on the table's size. Smaller tables have smaller file sizes, while larger tables have larger file sizes.||Automatically adjusted based on the table's size. For example, a table smaller than 2.56 TB has a target file size of 256 MB, while a table larger than 10 TB has a target file size of 1 GB.|
|Limit Rows Written in a Data File||Specify the maximum number of records to write to a single file using spark.sql.files.maxRecordsPerFile configuration. Useful to avoid errors when the number of rows exceeds Parquet format limits.||spark.conf.set("spark.sql.files.maxRecordsPerFile", "1000000")|
Best Practice 7: Combine Data from Different Tables using Low Shuffle Merge
In Databricks Runtime 9.0 and newer versions, a feature called “Low Shuffle Merge” is introduced to provide an optimized implementation of the MERGE operation. The MERGE operation is commonly used to synchronize data between a source and target table based on matching keys.
Low Shuffle Merge is designed to improve the performance of the MERGE operation for most common workloads. It achieves this by minimizing data shuffling, which is the process of redistributing data across the cluster during computation. Shuffling can be a resource-intensive operation and can negatively impact query performance.
With Low Shuffle Merge, the data movement or shuffling is reduced, resulting in better performance. Instead of performing a full shuffle of all the data, the optimized implementation minimizes the amount of data movement required. This is particularly beneficial for large-scale datasets and complex
Furthermore, Low Shuffle Merge preserves the existing data layout optimizations, such as
Z-ordering, on the unmodified data. Z-ordering is a technique used in Delta Lake to co-locate data that share common attributes, which improves query performance by reducing the amount of data that needs to be read during query execution. By preserving Z-ordering on unmodified data during the MERGE operation, the benefits of this optimization are maintained.
Low Shuffle Merge is enabled by default in Databricks Runtime 10.4 and above. In earlier supported Databricks Runtime versions it can be enabled by setting the configuration spark.databricks.delta.merge.enableLowShuffle to true.
spark.databricks.delta.merge.enableLowShuffle = true
Best Practice 8: Manage Data Recency using Time Travel and Checkpoints
Delta Lake supports concurrent reads and writes on the same table, which enables real-time data processing and analytics. However, this also introduces some challenges for managing data recency and consistency.
For example, if you have a streaming job that writes data to a Delta table every minute, and another job that reads data from the same table every hour, how do you ensure that the reader sees the latest data? Actually, there are two ways :
- Way 1: Using Time Travel
- Way 2: Using Checkpoints
Time travel allows you to query a specific version or timestamp of a Delta table. Consider Best Practice 12 to understand more about Time Travel.
Checkpoints are snapshots of the state of a Delta table at a given point in time. Checkpoints store metadata such as schema, partitioning, and file lists, which can speed up query planning and execution. You can create checkpoints manually using the
CREATE CHECKPOINT command, or automatically using configuration options.
For example, you have a Delta table called “sensor_data” that stores real-time sensor readings. You have a streaming job that continuously writes data to this table every minute and another batch job that runs hourly analytics on the same table.
To optimize the batch job’s performance, you can create checkpoints that store the table’s metadata. Checkpoints speed up query planning and execution.
// Manually create a checkpoint spark.sql("CREATE CHECKPOINT FOR TABLE sensor_data") // Automatically create a checkpoint using configuration spark.conf.set("spark.databricks.delta.checkpointLocation", "/delta/checkpoints") // Use the checkpoint for querying spark.sql("SELECT * FROM sensor_data")
By creating a checkpoint, the batch job can refer to it for metadata, such as schema and partitioning information, which helps optimize its processing.
However, checkpoints have their limitations:
- They may not reflect the latest committed version of the table, as they are not transactional.
- Checkpoints don’t automatically update when the table changes, necessitating periodic refreshing or recreation.
- Obsolete checkpoints are not automatically deleted, requiring manual management or leveraging enhanced checkpoints.
Best Practice 9: Reduce the Search Space for Matches
To improve the performance of the merge operation, you can narrow down the search space by specifying known constraints in the match condition. By doing so, the merge operation will only search for matches in specific partitions, rather than the entire Delta table.
For example, let’s say you have a table partitioned by country and date, and you want to use merge to update data for the last day in the USA.
events.date = current_date() AND events.country = 'USA'
By adding the above code, the merge operation will only search for matches within the country partitions. of USA. This makes the query faster and reduces the chances of conflicts with other concurrent operations.
In simpler terms, by including specific conditions in the merge operation, you can limit the search to specific partitions and achieve faster and more targeted updates, while also minimizing potential conflicts with other processes.
Best Practice 10: Use Delta Standalone Libraries
The Delta Standalone library is a Java library that allows you to interact with Delta tables without using Spark. It’s designed for single-node environments and enables you to access Delta tables using Java APIs in various third-party tools and applications.
For example, you can use the Delta Standalone library with:
- Apache Hive
- Apache Presto
- Apache Flink
- Apache Kafka Connect
- Apache NiFi
- AWS Glue
- AWS Athena
- AWS Redshift Spectrum
- Azure Synapse Analytics
- Google BigQuery
To use the Delta Standalone library, you need to include it as a dependency in your Java project. Once added, you can utilize its APIs to perform read and write operations on Delta tables from your chosen tool or application.
Delta Standalone is optimized for cases when you want to read and write Delta tables by using a non-Spark engine of your choice. It is a “low-level” library and an open-source library. Developers can use higher-level connectors for their desired engines that use Delta Standalone for all Delta Lake metadata interaction.
Best Practice 11: Use Schema Evolution
Schema evolution allows you to modify the structure of a Delta table without breaking existing queries or pipelines. It enables you to add new columns, rename columns, or change column types without impacting downstream consumers.
To use schema evolution, you need to set the “
mergeSchema” option when writing to a Delta table. This automatically merges new columns from the source data into the target schema and converts existing columns to new types if possible.
However, schema evolution also has some limitations:
- It does not work with nested columns or complex types (such as arrays or maps).
- It does not work with partitioned columns or Z-order columns.
- It does not work with streaming sources or sinks.
- It may cause performance degradation due to schema inference and casting.
Therefore, you should use schema evolution carefully and follow these guidelines:
- Only use schema evolution when necessary and avoid frequent schema changes.
- Limited schema evolution to compatible changes such as adding optional columns or widening numeric types.
- Apply schema evolution only to non-partitioned and non-Z-order columns.
- Use schema evolution only for batch sources and sinks.
Best Practice 12: Analyse your Data Using Time Travel
Time travel in Delta Lake allows you to access and analyze data from previous versions of a table. This enables you to:
- Track and review changes made to the data over time and easily revert to a previous version if needed.
- Ensure consistency in experiments, reports, or analyses by accessing the data as it existed at a specific point in time.
- Populate historical data using the same logic and processing rules applied to current data.
To utilize time travel, you can use the “
VERSION AS OF” or “
TIMESTAMP AS OF” clauses when querying a Delta table. For example, you can retrieve data from version 10 of a table or query the table as it existed on a specific date using the timestamp.
SELECT * FROM events VERSION AS OF 10
Or you can query the table as of yesterday as follows:
SELECT * FROM events TIMESTAMP AS OF date_sub(current_date(), 1)
You can also use time travel with streaming sources and sinks, which allows you to replay or reprocess historical data with different logic or configurations.
In conclusion, Delta Lake offers a comprehensive set of best practices to optimize data workflows and enhance query performance. By selecting the right partition column, providing data location hints, compacting files, and efficiently managing data recency, users can improve query speed and file system performance. Additionally, considerations such as judicious use of Spark caching, configuring file sizes, leveraging Low Shuffle Merge for combining data, employing schema evolution, reducing search space, utilizing Delta Standalone Libraries, and analyzing data using Time Travel contribute to efficient data management and analysis. By following Delta Lake’s best practices, users can harness the full potential of Delta Lake and achieve optimal performance, scalability, and reliability in their data-driven applications.