Last time, we left Bob in quite a situation. He was struggling with his store’s ordering. In particular, his issue was to determine when to Z-order books again to maintain the effectiveness of his shelf skipping technique. Not to forget, ordering the entire store again and again is a time-consuming and tiring labor (part-1).
Bob’s story is nothing but a sketch of a more profound problem when dealing with huge volumes of data.
So now, the time has come to address the elephant in the room. How can we measure the successfulness of such a costly and time-consuming operation such as Z-ordering? And how can we track our data clustering state as it’s continuously deteriorating?
Delta Lake is an open-source storage framework that enables building a Lakehouse architecture with compute engines such as Spark. Delta Lake brings reliability and performance to data lakes by providing ACID transactions, scalable metadata handling, and unifies streaming and batch data processing on top of existing data lakes, such as S3.
Delta on disk
As shown in [Figure-2], Delta lake stores data into multiple parquet files. When data is written into a Delta Lake table, statistics (min-max values) are collected and persisted automatically in the Delta transaction log. Changes to the table are stored in the Delta transaction log as ordered atomic units called commits.
Skipping based on statistics
Data Skipping (available in Delta Lake 1.2.0 and above) is a feature that is meant to avoid scanning irrelevant data. It uses file-level statistics in order to perform skipping at file granularity. You do not need to configure data skipping; this feature is activated automatically whenever applicable for the first 32 columns of the table, unless disabled deliberately.
Delta Lake takes advantage of statistics (minimum and maximum values for each column) at query time to avoid opening irrelevant files therefore reducing query time [Figure-3].
For the pruning to be effective, data needs to be clustered so that min-max ranges are narrow and, ideally, non-overlapping. So, data skipping is deeply influenced with the data layout.
Z-ordering is a technique used to colocate related information in the same set of files. It maps multi-dimensional points to one-dimensional values in a way that preserves locality [Figure-4]. This feature is available in Delta Lake 2.0 and above.
Z-ordering is a very expensive operation (this could be the subject of an entire separate blog).
But even after z-ordering your data, there’s no way of knowing how well your data is clustered for any given column. In addition, after every insert in the delta table, the newly inserted data is not ordered by your previous access patterns, which can impact the queries’ performance dramatically. Also, deletes and updates performed on already existing data break the z-ordering gradually.
So, it’s principal to monitor the clustering state of data and its evolution in order to reach the best query performance possible with the minimum ordering charges (part-1).
In order to gain more insights on how well our data is clustered, it’s imperative to extract numerous clustering informations (metrics) that will help determine whether our data is well clustered at a given time, and when to re-order as our data is continuously changing.
Following, a list of metrics that we, Databeans, found worthy implementing as part of our library that serves to compute clustering metrics for Delta tables.
Total number of files composing the Delta table.
Files in which min and max values of a given ordering column are equal [Figure-5]. Very useful for perfect skipping (best case scenario).
Average number of overlapping files for each file in the delta table.
Best case scenario: average_overlaps = 0 ⇒ table perfectly clustered with no overlapping files.
Worst case scenario: average_overlaps = (total_file_count — 1) ⇒ table with all files overlapping.
⇒ The higher the average_overlap, the worse the clustering.
So to better illustrate, [Figure-6] is a simple example of a table consisting of 4 files:
the average number of files that will be read when an overlap occurs.
Empty table ⇒ average_overlap_depth = 0
Table with no overlapping files ⇒ average_overlap_depth = 1
⇒ The higher the average_overlap_depth, the worse the clustering.
Throughout [Figure-7], we will study the evolution of the average_overlap_depth of a table containing 4 files:
– Initially, there are no overlapping files, so the table is perfectly clustered.
⇒ Best case scenario: average_overlap_depth = 1.
– Going on, as the clustering is getting worse, the average_overlap_depth is getting higher.
– In the final stage, all files overlap in their entirety
⇒ Worst case scenario: average_overlap_depth = number of files.
A histogram detailing the distribution of the overlap_depth on the table by grouping the tables’ files by their proportional overlap depth.
The histogram contains buckets with widths:
- 0 to 16 with increments of 1.
- For buckets larger than 16, increments of twice the width of the previous bucket (e.g. 32, 64, 128, …)
Monitoring clustering information
In order to monitor the clustering state of a delta table, DataBeans presents DeltaClusteringMetrics, a library used to extract clustering metrics for Delta tables.
- forName(“ tableName ”): Name of the Delta Table.
- forPath(“ Path ”): Path for the Delta Table.
- computeForColumn(“columnName”): extract clustering information for a certain column.
- computeForColumns(“col1”,”col2”,…): extract clustering information for multiple columns.
- computeForAllColumns(): extract clustering information for the entire table.
The output is a spark DataFrame containing the next columns:
We are going to experiment with the store_sales table of the TPC-DS [Figure-9], one of the biggest tables with a size exceeding 386 GB, containing 8.639.911.075 rows, initially composed of 256 files.
First of all, let’s start by querying the store_sales table by applying a selective filter on the “ss_item_sk” column in order to establish a landmark on which we will proceed. The query took 33.86 seconds [Figure-10].
And by consulting the Spark UI, we notice that there is no pruning on file level and the number of files read is equal to the total number of files [Figure-11].
Next, let’s extract the clustering metrics for the ss_item_sk column.
By invoking DeltaClusteringMetrics on the “ss_item_sk” column in [Figure-12], Results state that:
- average_overlap = 255 (total_file_count — 1) ⇒ every file overlaps with all the other files of the table (worst case scenario)
- average_overlap_depth = 256 (total_file_count) ⇒ every time an overlap occurs, all the files of the table will be read (worst case scenario)
Furthermore, the file_depth_histogram portrays an overlap_depth of 256 files for all the files of the delta table [Figure-13].
⇒ In conclusion, there is no ordering whatsoever on the column “ss_item_sk”.
Given our unacceptable clustering state, it’s essential to recluster our data by Z-ordering the store_sales table by the “ss_item_sk” column in order to improve our query performance [Figure-14].
The Z-order command took more than 27 minutes (Z-ordering is an expensive operation) , and resulted in:
- numFilesAdded: 1437 (total_file_count after Z-ordering)
- numFilesRemoved: 256 (total_file_count before Z-ordering)
To better understand changes brought by the Z-order operation on our data clustering, let’s inspect the new clustering metrics [Figure-15]
⇒ Both the average_overlap and the average_overlap_depth values dropped dramatically for the ss_item_sk column indicating that our current clustering of data should favor skipping based on statistics for our future queries.
As for the file_depth_histogram , it illustrates that all the files have an overlap depth value ≃ 3 [Figure-16].
So, when querying the “ss_item_sk” column again after Z-ordering [Figure-17], the query only took 3.90 seconds (almost 9X faster than the query on the same column before Z-ordering).
⇒ Querying data by the Z-ordering predicate indicates a significant performance improvement.
And through the Spark UI, we can see that the query only read three files and skipped 1434 files indicating that our new clustering of data favors skipping based on statistics [Figure-18].
⇒ The clustering metrics can help predict the state of our data clustering before even running the query.
Moving on, let’s query the store_sales table by another column such as “ss_customer_sk”. In this case, the query took 2.82 minutes [Figure-19].
⇒ Comparatively with our previous experiments, it’s obvious that this query is not efficient.
And to better understand the reasons behind our query’s poor performance, it’s essential to extract the clustering metrics for the column in question [Figure-20].
- average_overlap = 1436 (total_file_count — 1) ⇒ every file overlaps with all the other files of the table (worst case scenario).
- average_overlap_depth ≃ 1219 ⇒ every time an overlap occurs, 1219 files in average will be read.
As it’s clearly illustrated in the file_depth_histogram in [Figure-21], for the column “ss_item_sk”, the overlap depth values for all files is low and concentrated in the left side of the histogram which indicates a healthy clustering state. As for the “ss_customer_sk” column, the overlap depth value for all files is located in the far right side of the histogram and equals the total number of table files to illustrate a very poor clustering state.
Obviously, in order to enhance queries performance for both columns, we are going to Z-order our table by both “ss_item_sk” and “ss_customer_sk” columns [Figure-22]:
The new optimize command metrics are:
- numFilesAdded: 2049 (total_file_count after Z-ordering)
- numFilesRemoved: 1437 (total_file_count before Z-ordering)
Accordingly, the clustering metrics for both columns are shown in the [Figure-23]:
The average_overlap and the average_overlap_depth values for both columns are low comparatively with the total_file_count. For the ss_customer_sk column, metrics emphasize a significant improvement in data clustering. Although it’s clear that the clustering for the ss_item_sk column isn’t as competent as it used to be before adding the ss_customer_sk column to the Z-order columns.
Correspondingly, relying on the file_depth_histogram [Figure-24], it’s clear that for both columns, the majority of the files have an overlap depth value of 64 and only a minority has an overlap depth of 128 which is still far inferior to the total file count (2049).
⇒ When Z-ordering by multiple columns, the clustering state gradually deteriorates (average_overlap and average_overlap_depth values are gradually augmenting). However, comparatively with the initial state of the table, there is still a huge improvement indicated both throughout the clustering metrics (before running the query) and queries performance.
Subsequently, when executing queries on both columns, their performance should be satisfying:
⇒ Querying the “ss_item_sk” column took 20.95 seconds [Figure-25].
⇒ Querying the “ss_customer_sk” column took 26.89 seconds [Figure-26].
⇒ By extracting the data clustering metrics of a delta table, you can gain insights on your queries performance before executing the query itself. In addition, It can help you determine whether it’s advisable to Z-order the delta table or not.
Our clustering metrics help track the clustering state of a delta table and its behavior in time. They provide an overview on the files’ layout on disk. However, it’s neither accurate nor conclusive. The ultimate indicator for clustering diagnosis remains always your query’s performance.