Unlocking the Full Power of Apache Spark 3.4 for Databricks Runtime!


You’ve dabbled in the magic of Apache Spark 3.4 with my previous blog “Exploring Apache Spark 3.4 Features for Databricks Runtime“, where we journeyed through 8 game-changing features—from the revolutionary Spark Connect to the nifty tricks of constructing parameterized SQL queries. But guess what? We’ve only scratched the surface!

In this sequel, we’re diving deeper into the treasure trove of Apache Spark 3.4 features that are set to supercharge your Databricks Runtime experience. Whether you’re a seasoned developer or a curious newbie, get ready to unlock a new realm of possibilities that will take your projects to new heights.

Latest Features of Apache Spark 3.4 for Databricks Runtime

Next, I’ll delve into a comprehensive exploration of the features introduced in Apache Spark 3.4, breaking down each one to give you a deeper understanding of their capabilities and advantages.

Feature 9: MELT and UNPIVOT Method

What are MELT and UNPIVOT?

MELT and UNPIVOT are two data transformation methods added as the latest features for Databricks Runtime that are used to reshape DataFrames. They can help you change the shape of your data. Imagine your data as a table. These tools help you rearrange the rows and columns of that table in a way that makes it easier to work with.

Why Do We Need Them?

Before MELT and UNPIVOT, changing the shape of data was a bit complicated and could lead to mistakes. These new tools make the process simpler and more efficient. Let’s understand the wide format and long format.

In a wide format, each column represents a different variable, and each row represents a different observation. In long format, each column represents a different value, and each row represents a different combination of variable and value. It is a powerful tool that can be used to:

  • Separate dimensions from measures in a DataFrame
  • Aggregate measures by dimensions
  • Create new columns from existing columns

The MELT and UNPIVOT methods were introduced in Spark 3.4 to address the following problems:

  • The previous way to unpivot a DataFrame in Spark was to use SQL pivot and agg functions. This was a two-step process that could be cumbersome and error-prone.
  • The pivot and agg functions could not be used to unpivot multiple columns at once.

The MELT and UNPIVOT methods solve these problems by providing a single, easy-to-use function for unpivoting DataFrames. They can also be used to unpivot multiple columns at once.

What Do They Do?

MELT: Takes your data and stretches it out into a longer table. It keeps some columns the same and breaks down other columns into more rows.

The melt function takes two arguments:

  • id_vars: The columns that should be preserved in long format.
  • value_vars: The columns that should be “unpivoted” into separate columns.

For example, the following DataFrame has two columns: name and values:

| name | values |
|---|---|
| Alice | 1, 2, 3 |
| Bob | 4, 5, 6 |

The following code uses the melt function to convert this DataFrame into a “long” format:

df.melt(id_vars=["name"], value_vars=["values"])

The output of this code is a new DataFrame with three columns: name, variable, and value:

| name | variable | value |
|---|---|---|
| Alice | values | 1 |
| Alice | values | 2 |
| Alice | values | 3 |
| Bob | values | 4 |
| Bob | values | 5 |
| Bob | values | 6 |

UNPIVOT: Takes your long table and transforms it. It can also do some calculations like adding numbers together from the rows.

The unpivot function takes four arguments:

  • id_vars: The columns that should be preserved as-is.
  • value_vars: The columns that should be “unpivoted” into separate columns.
  • pivot_column: The column that contains the values that should be unpivoted.
  • agg_func: The aggregation function that should be used to aggregate the values in the value_vars columns.

For example, the following DataFrame has three columns: name, variable, and value:

| name | variable | value |
|---|---|---|
| Alice | values | 1 |
| Alice | values | 2 |
| Alice | values | 3 |
| Bob | values | 4 |
| Bob | values | 5 |
| Bob | values | 6 |

The following code uses the unpivot function to convert this DataFrame back into a “wide” format:

df.unpivot(id_vars=["name"], value_vars=["values"], pivot_column="variable", agg_func="sum")

The output of this code is a new DataFrame with two columns: name and values:

| name | values |
|---|---|
| Alice | 6 |
| Bob | 15 |

Let’s see another example:

val df = Seq((1, 11, 12L), (2, 21, 22L))
  .toDF("id", "int", "long")
df.show()
// output:
// +---+---+----+
// | id|int|long|
// +---+---+----+
// |  1| 11|  12|
// |  2| 21|  22|
// +---+---+----+

df.unpivot(
  Array($"id"),
  Array($"int", $"long"),
  "variable",
  "value")
  .show()
// output:
// +---+--------+-----+
// | id|variable|value|*
// +---+--------+-----+
// |  1|     int|   11|
// |  1|    long|   12|
// |  2|     int|   21|
// |  2|    long|   22|
// +---+--------+-----+

Feature 10: OFFSET Clause in SQL Queries

The OFFSET clause is a new feature in Spark 3.4 for Databricks Runtime that allows you to skip a certain number of rows before returning results from a SELECT statement. This can be useful for a variety of tasks, such as:

  • Getting the top N rows in a table, after skipping the first M rows.
  • Getting the last N rows in a table, after skipping the last M rows.
  • Getting the rows in a table that are not in a certain range.

The previous way to skip rows in a SELECT statement was to use the LIMIT clause with a negative value. However, this could be confusing and error-prone, as it could be interpreted as returning the last N rows.

The OFFSET clause solves this problem by providing a clear and concise way to skip rows in a SELECT statement. It is also more efficient than using the LIMIT clause with a negative value, as it does not require Spark to sort the entire dataset.

The OFFSET clause is used in conjunction with the ORDER BY clause to ensure that the results are deterministic. For example, the following query will return the top 3 rows in the person table, after skipping the first 2 rows:

SELECT name, age FROM person ORDER BY name OFFSET 2 LIMIT 3;

Feature 11: NumPy Instance in PySpark

The official support for NumPy instances in PySpark was introduced in Spark 3.4. This allows you to create DataFrames with NumPy arrays, and to use NumPy functions on DataFrames.

The previous way to use NumPy with PySpark was to convert NumPy arrays to Spark RDDs. This was a two-step process that could be cumbersome and error-prone.

However, if you are using an older version of Spark, you may encounter an error when trying to use NumPy instances. The error message will be something like this:

pyspark.sql.utils.AnalysisException: Cannot create a DataFrame from a Python object of type numpy.ndarray.

This error occurs because NumPy arrays are not supported in older versions of Spark. To fix this error, you need to upgrade to Spark 3.4 or newer.

The support for NumPy instances in PySpark solves this problem by providing a single, easy-to-use way to work with NumPy arrays in PySpark.

For example, the following code creates a DataFrame from a NumPy array:

import numpy as np
import pyspark

spark = pyspark.sql.SparkSession.builder.appName("NumPy in PySpark").getOrCreate()

df = spark.createDataFrame(np.array([[1, 2], [3, 4]]))

df.show()

This code will produce the following output:

+---+---+
| _1| _2|
+---+---+
|  1|  2|
|  3|  4|
+---+---+

As you can see, the DataFrame contains two rows, each with two columns. The first column is named _1, and the second column is named _2. The values in the DataFrame are the same as the values in the NumPy array.

You can also use NumPy arrays as input in SQL expressions. For example, the following code uses a NumPy array to filter a DataFrame:

df = spark.read.csv("data.csv")

filtered_df = df.filter(df["column_name"].isin(np.array(["value1", "value2"])))

filtered_df.show()

This code will filter the DataFrame to only include rows where the value in the column_name column is equal to one of the values in the NumPy array.

You can also use NumPy arrays for ML. For example, the following code trains a linear regression model on a NumPy array:

import numpy as np
from pyspark.ml.regression import LinearRegression

data = np.array([[1, 2], [3, 4]])

lr = LinearRegression()
lr.fit(data)

predictions = lr.predict(data)

print(predictions)

This code will train a linear regression model on the NumPy array data. The model will then be used to make predictions on the data. The predictions are printed on the console.

Feature 12: Use of SQLSTATE for Error Classes

The previous way to report errors in Spark was to use the ERROR clause. However, this could be confusing and error-prone, as the ERROR clause could be used to report a variety of different errors.

The SQLSTATE for error clause solves this problem by providing a standardized way to report errors. SQLSTATE is a five-byte code that is used to represent the return status of a SQL query or command. It is a standard in the database management system industry, and it is used to simplify communication between clients and servers.

The first two bytes of the SQLSTATE code represent the class of the error, and the last three bytes represent the subclass of the error.

Apache Spark 3.4 features for Databricks Runtime has updated its error handling to comply with the SQLSTATE standard. This means that Spark will now return SQLSTATE codes for a significant majority of error cases. This makes it easier for clients to understand the errors that occur in Spark, and it makes it easier for developers to debug Spark applications.

For example,

  1. The SQLSTATE value 22003 represents a “numeric value out of range” error. This error is returned when a Spark SQL query or command attempts to use a numeric value that is out of range.
  2. The SQLSTATE value 22012 represents a “division by zero” error. This error is returned when a Spark SQL query or command attempts to divide by zero.

Other SQLSTATE codes are:

  • 22000: Syntax error or access violation.
  • 22001: Invalid number.
  • 42000: Syntax error or access violation.
  • 42001: Invalid column name.
  • 42002: Invalid column type.
  • 42003: Invalid data type.
  • 42004: Invalid cursor name.
  • 42005: Invalid SQL statement name.

Here is an example of code that uses SQLSTATE:

import pyspark 

spark = pyspark.sql.SparkSession.builder.appName("SQLSTATE example").getOrCreate() 

df = spark.read.csv("data.csv") 

try: 
  df.select("column1").where("column2 > 10000") 
except 
  Exception as e: print(e.sqlstate)

In this code, we first create a Spark Session and read a CSV file into a DataFrame. Then, we try to select the column1 column from the DataFrame where the value in the column2 column is greater than 10,000. However, this will cause an error because the column2 column only contains values that are less than or equal to 10,000.

The except clause catches the error and prints the SQLSTATE code.

Feature 13: Memory Profiler for PySpark UDFs (User-Defined Functions)

The memory profiler for PySpark UDFs (User-Defined Functions) was introduced in Spark 3.4 to help users optimize their UDFs and avoid out-of-memory errors.

UDFs are a powerful tool that can be used to extend the functionality of Spark. However, they can also be a source of memory leaks and performance problems. The memory profiler for PySpark UDFs can help users to identify and fix these problems.

The memory profiler works by collecting information about the memory usage of each UDF. This information can be used to identify the lines of code in the UDF that are consuming the most memory. The user can then optimize the UDF to reduce its memory usage.

To enable the memory profiler for PySpark UDFs, you need to set the spark.python.profile.memory configuration property to true. You can also set the spark.python.profile.memory.limit configuration property to specify the maximum amount of memory that a UDF can consume.

Here is an example of how to enable the memory profiler for PySpark UDFs:

spark.conf.set("spark.python.profile.memory", "true")
spark.conf.set("spark.python.profile.memory.limit", "1000m")

This code will enable the memory profiler and set the maximum memory usage for each UDF to 1000 megabytes.

Here is an example of how to use the new memory profiler:

from pyspark.sql.functions import *

@udf("int")
def f(x):
   return x + 1

_ = spark.range(2).select(f('id')).collect()
spark.sparkContext.show_profiles()

The above code creates a PySpark UDF that adds 1 to the input value. It then uses the UDF to create a DataFrame and collects the results. Finally, it calls the show_profiles() method to show the memory profile of the UDF.

The output of the show_profiles() method is a table that shows the memory usage of the UDF line by line. The table includes the following information:

  • The line number of the UDF definition.
  • The total memory usage of the UDF at that line.
  • The amount of memory that was allocated at that line.
  • The number of times that line was executed.

In the above example, the output of the show_profiles() method shows that the UDF allocates 116.9 MiB of memory at line 3. This memory is allocated when the UDF is defined. The UDF is then executed twice, and the memory usage does not change.

============================================================
Profile of UDF<id=11>
============================================================
Filename: <command-1010423834128581>

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
     3    116.9 MiB    116.9 MiB           2   @udf("int")
     4                                         def f(x):
     5    116.9 MiB      0.0 MiB           2       return x + 1

The new memory profiler for PySpark UDFs is a powerful tool that can help you identify and fix memory problems in your PySpark applications. By profiling your UDFs line by line, you can see where memory is being allocated and how it is being used. This information can help you to optimize your UDFs and improve the performance of your Spark applications.

The memory profiler for PySpark UDFs can be used to optimize the UDFs and avoid out-of-memory errors. If you are using PySpark UDFs, I recommend enabling the memory profiler to help you improve the performance of your applications.

Feature 14: Support for Protobuf

Protobuf is a popular binary serialization format that is used for a variety of purposes, including streaming data. In the past, Spark did not have native support for Protobuf, which meant that users had to write their own code to read and write Protobuf records.

This was a significant limitation, especially for streaming use cases. Streaming data is often very large and fast-moving, and it can be difficult to write efficient code to read and write it in Protobuf format.

Protobuf is used in a wide variety of applications, including Google’s RPC system, gRPC. It was introduced in Spark 3.4 to support the reading and writing of protobuf data in Spark SQL. This can be useful for a variety of tasks, such as:

  • Reading data from a Kafka topic that is serialized in protobuf format.
  • Writing data to a database that supports protobuf format.
  • Sharing data between Spark and other applications that use protobuf.

To read or write protobuf data in Spark SQL, you need to use the from_protobuf() and to_protobuf() functions. These functions take a protobuf message as input and output a Spark SQL Row or a protobuf message, respectively.

Here is an example of how to use the from_protobuf() and to_protobuf() functions:

import protobuf

message = protobuf.Message()
message.set_field("name", "John Doe")
message.set_field("age", 30)

df = spark.read.format("protobuf").load("data.protobuf")

records = df.select("name", "age").collect()

for record in records:
    print(record["name"], record["age"])

serialized_record = df.to_protobuf(record)

with open("output.protobuf", "wb") as f:
    f.write(serialized_record)

This code creates a Protobuf message, populates it with data, and then reads it back from a file. The code also shows how to serialize a Protobuf record to a file.

Feature 15: Python Arbitary Stateful Processing

Arbitrary stateful processing, the latest Spark 3.4 feature for Databricks Runtime, is now supported in Structured Streaming with Python. This means that you can now write Python code that can access and modify the state between batches of data in Structured Streaming. This can be useful for a variety of tasks, such as:

  • Calculating moving averages or other rolling statistics.
  • Tracking the frequency of events.
  • Identifying anomalies in data.

The previous way to maintain a state in Spark was to use RDDs. However, RDDs are not efficient for stateful processing, as they require the entire state to be serialized and deserialized each time a batch is processed.

Arbitrary stateful processing solves this problem by allowing you to write Python functions that can maintain a state in memory. This is more efficient than using RDDs, as the state does not need to be serialized and deserialized each time a batch is processed.

With arbitrary stateful processing, Python users can now write code that can:

  • Access and modify data from previous batches
  • Maintain state across batches
  • Implement complex logic

Here is an example of how to use arbitrary stateful processing in Structured Streaming with Python:

import pyspark
from pyspark.sql.functions import *

def process_batch(df):
    # Access the state from the previous batch
    state = df.state

    # Update the state
    state["count"] += 1

    # Return the updated state
    return state

df = spark.readStream.format("rate").load()

df = df.withColumn("count", lit(0))

df = df.iterate(process_batch)

df.writeStream.outputMode("append").format("console").start()

This code creates a DataFrame from a streaming source and then uses the iterate() function to process the DataFrame in batches. The process_batch() function accesses the state from the previous batch and updates it. The updated state is then returned to the iterate() function, which is used to process the next batch of data.

Conclusion

In conclusion, Apache Spark 3.4’s integration into Databricks Runtime brings a range of powerful enhancements. MELT and UNPIVOT operations offer versatile data transformation, while the OFFSET clause in SQL queries provides more precise control. NumPy integration in PySpark bridges the gap between frameworks, and SQLSTATE error classes improve error handling.

The memory profiler for PySpark UDFs enhances development efficiency, and support for Protobuf and Python Arbitrary Stateful Processing extends data processing capabilities. These updates collectively empower data professionals with advanced tools, streamlined workflows, and improved performance, enabling more insightful data analysis and informed decision-making.

+ There are no comments

Add yours

Leave a Reply