Exploring the Latest Features of Apache Spark 3.4 for Databricks Runtime


In the dynamic landscape of big data and analytics, staying at the forefront of technology is essential for organizations aiming to harness the full potential of their data-driven initiatives. Apache Spark, the powerful open-source data processing and analytics framework, continues to evolve with each new release, bringing enhancements and innovations that drive the capabilities of data professionals further.

With the introduction of Apache Spark 3.4, the landscape of Databricks Runtime transforms once again, introducing a host of features that promise to revolutionize the way data is processed, analyzed, and leveraged.

Let’s dive into the cutting-edge offerings that Apache Spark 3.4 brings to the table and explore how these features align with the objectives of data-driven enterprises.

Apache Spark 3.4 for Databricks Runtime

Spark 3.4 has introduced many wonderful features. To make it easier for you to grasp and use each feature effectively in your work, I’ve split this article into two parts. This way, you can achieve better results. These features are elaborated upon below.

Feature 1: Spark Connect

In the previous version of Spark, the client application had to be tightly coupled to the Spark driver. This made it difficult to develop and deploy client applications, and it also limited the scalability and flexibility of Spark applications.

Spark Connect is a new client-server architecture introduced in Apache Spark 3.4 that solves the problem by decoupling Spark client applications and allowing remote connectivity to Spark clusters. The separation between client and server allows Spark and its open ecosystem to be leveraged from anywhere, embedded in any application.

Here are the key features of Spark Connect:

  • Thin API Usable Everywhere: Spark Connect has a lightweight interface. This means it’s not bulky and can easily fit into various platforms. Whether you’re working on an application server, using a coding tool (IDE), writing in a digital notebook, or coding in different languages, you can integrate Spark Connect seamlessly.
  • Universal Communication Language: Instead of using complex codes, Spark Connect communicates using ‘unresolved logical plans’. Think of these as a universal language that both the client (like your computer) and the Spark driver (the main controller of Spark) understand. This ensures smooth communication regardless of the language or platform you’re using.
  • Remote Spark Use: One of the great things about Spark Connect is that you can run Spark tasks remotely. Imagine using an app on your phone that’s powered by a supercomputer in another country. With Spark Connect, you don’t need to have Spark installed on your device (client) to do this.
  • Faster Data Searches: Spark Connect has a feature called ‘query pushdown’. This can sift through data more efficiently and improve the query performance. When you’re searching for something, Spark Connect ensures that the search is as fast and accurate as possible

The following diagram illustrates how Spark Connect works:

The client application uses the Spark Connect API to connect to the Spark cluster. The Spark Connect API then translates the client’s queries into unresolved logical plans, which are sent to the Spark driver. The Spark driver then executes the queries on the Spark cluster and returns the results to the client application.

You can use Spark Connect:

  1. Detecting Fraudulent Transactions in Real-time: Spark Connect’s ability to process data quickly and in real-time makes it suitable for identifying suspicious activities, like credit card misuse or insurance scams, as they happen.
  2. Analyzing Streaming Data in Real-time: With its real-time processing capabilities, Spark Connect can analyze continuous data streams, such as updates from social platforms or stock market movements, providing instant insights.
  3. Creating Dashboards and Visualizations: Spark Connect can process and present data efficiently, making it easier to design visual dashboards highlighting current data patterns and trends.
  4. Training and Deploying Machine Learning Models: Given its speed and efficiency, Spark Connect can quickly train and implement smart prediction tools, like suggestion systems or forecasting models, in real-time.
  5. Integrating Data from Different Systems: Spark Connect’s flexibility allows it to mix and process data from diverse platforms or software, like CRM tools or ERP systems, providing a unified view or analysis.

Feature 2: Addition of TorchDistributor Module to PySpark

The TorchDistributor module is a new module in PySpark that makes it easier to do distributed training with PyTorch on Spark clusters.

Before TorchDistributor, there were two main ways to do distributed training with PyTorch on Spark:

  • Using the PyTorch DistributedDataParallel (DDP) API: This API allows you to distribute the training of a PyTorch model across multiple GPUs or CPUs. However, it can be difficult to use and manage, especially for large models or datasets.
  • Using the Spark MLlib distributed training APIs: These APIs allow you to train a Spark MLlib model on a distributed cluster. However, they are not designed for PyTorch models and can be inefficient for training large models.

TorchDistributor solves these problems by providing a simple and efficient way to do distributed training with PyTorch on Spark. It works by:

  • Initializing the PyTorch distributed environment on each worker node.
  • Distributing the training data across the worker nodes.
  • Launching the PyTorch training job on each worker node.
  • Logging the training progress and metrics to Spark.

TorchDistributor provides a simple API for launching PyTorch training jobs as Spark jobs. It takes care of initializing the environment and the communication channels between the workers, and then it invokes the CLI command torch.distributed.run to run distributed training across the worker nodes.

To use TorchDistributor, you need to:

  1. Import the TorchDistributor module.
  2. Create a TorchDistributor object.
  3. Call the run() method on the TorchDistributor object, passing in the PyTorch training function and any other required parameters.

Consider the below code for how to use the TorchDistributor module to train a PyTorch model.


from pyspark.ml.torch.distributor import TorchDistributor 

def train(learning_rate, use_gpu):
  """Trains a PyTorch model using distributed training. 
  
  Args:
    learning_rate: The learning rate for the optimizer.
    use_gpu: Whether to use GPU acceleration. 
  Returns:
    The training results.
  """ 
  
  import torch
  import torch.distributed as dist
  import torch.nn.parallel.DistributedDataParallel as DDP
  from torch.utils.data import DistributedSampler, DataLoader 
  
  # Initialize the distributed environment.
  backend = "nccl" if use_gpu else "gloo"
  dist.init_process_group(backend) 
  
  # Get the device that this worker is running on.
  device = int(os.environ["LOCAL_RANK"]) if use_gpu else "cpu" 
  
  # Create the model and data loader.
  model = DDP(createModel(), device=device)
  sampler = DistributedSampler(dataset)
  loader = DataLoader(dataset, sampler=sampler) 
  
  # Train the model.
  output = train(model, loader, learning_rate) 
  
  # Cleanup the distributed environment.
  dist.cleanup() 
  
  return output 

distributor = TorchDistributor(num_processes=2, local_mode=False, use_gpu=True)
distributor.run(train, 1e-3, True)

The train function is a PyTorch function that defines the training logic. The distributor.run function calls the train function on each worker node in the cluster. The num_processes parameter specifies the number of worker nodes in the cluster. The local_mode parameter specifies whether to run the training in local mode or distributed mode. The use_gpu parameter specifies whether to use GPU acceleration.

You can use the TorchDistributor Module:

  • To classify images in real-time, such as for self-driving cars or security cameras.
  • To process natural language text in real-time, such as for chatbots or sentiment analysis.
  • To transcribe speech in real-time, such as for voice assistants or dictation software.
  • To analyze video in real-time, such as for surveillance or security application
  • To trade financial assets in real-time, such as stocks or commodities.

Feature 3: Support DEFAULT Values for Columns in Tables

In Spark 3.3 and earlier versions, if you tried to insert data into a table with a column that had a default value, but you did not specify a value for that column in the INSERT statement, the insertion would fail. This was because Spark did not know what value to use for the column.

The new DEFAULT values for columns feature in Spark 3.4 solves this problem by automatically inserting the default value for any column that is not explicitly specified in the INSERT statement. This makes it easier to insert data into tables, and it also prevents errors from being raised.

The following example describes the default values.

CREATE TABLE T(a INT, b INT NOT NULL);

-- The default default is NULL
INSERT INTO T VALUES (DEFAULT, 0);
INSERT INTO T(b)  VALUES (1);
SELECT * FROM T;
(NULL, 0)
(NULL, 1)

-- Adding a default to a table with rows, sets the values for the
-- existing rows (exist default) and new rows (current default).
ALTER TABLE T ADD COLUMN c INT DEFAULT 5;
--Default value 5 is inserted
INSERT INTO T VALUES (1, 2, DEFAULT);
SELECT * FROM T;
--Result of select
(NULL, 0, 5)
(NULL, 1, 5)
(1, 2, 5) 

Here we have some of the use cases of DEFAULT values in SQL Table:

  1. It improves the performance of inserts, especially when the table is large.
  2. Enforce data integrity and prevent errors.
  3. Simplify the data model and make it easier to understand and maintain.
  4. Spark can use the DEFAULT value to optimize queries that do not filter on that column. This can improve the performance of queries, especially when the table is large.
  5. Reduce the amount of data storage required for the table.

Feature 4: Addition of TIMESTAMP WITHOUT TIMEZONE Data Type

The TIMESTAMP WITHOUT TIMEZONE Data Type is a new feature in Spark 3.4 that allows you to represent timestamp values without a time zone. This is useful for data that does not need to be associated with a specific time zone, such as log files or sensor data.

Earlier, the only way to represent timestamps in Spark was with the TIMESTAMP data type. This data type stores timestamps with a time zone, which means that the same timestamp can have different values depending on the time zone that is used. This can be a problem for data that does not need to be associated with a specific time zone.

To use the TIMESTAMP WITHOUT TIMEZONE Data Type, You need to specify the TIMESTAMP_NTZ keyword when you create a column of this type. For example, the following code creates a column of type TIMESTAMP_NTZ:

df = spark.createDataFrame([('2023-08-23 12:00:00', 1)],
                          schema=['timestamp_ntz', 'value'])

The timestamp_ntz column in this DataFrame will store timestamps without a time zone.

You can use TIMESTAMP WITHOUT TIMEZONE for the following purposes:

  1. Used to analyze log files to identify trends or problems.
  2. Track the performance of systems or applications.
  3. Identify fraudulent transactions.
  4. Historical analysis of data.
  5. Analyze streaming data in real-time.

Here is another example of how TIMESTAMP WITHOUT TIMEZONE can be used for log analysis:

CREATE TABLE my_logs (
  timestamp TIMESTAMP WITHOUT TIMEZONE,
  message STRING
);

INSERT INTO my_logs (timestamp, message) VALUES ('2023-08-23 17:21:21', 'This is a log message');

In this example, the timestamp column is of type TIMESTAMP WITHOUT TIMEZONE. This means that the timestamp is stored as an absolute value, without any time zone information.

  1. Usage: When you use this type, you should be aware that the same timestamp value can represent different actual moments in time, depending on the timezone context in which it’s interpreted. For example, “2023-08-24 12:00:00” could be noon in New York or evening in London, depending on the local timezone of the system or application interpreting the value.
  2. Comparison with TIMESTAMP WITH TIMEZONE: In contrast, the TIMESTAMP WITH TIMEZONE type in Spark (if available) would store both the timestamp and its associated timezone, ensuring that the timestamp always refers to a specific, unambiguous point in time.
  3. Best Practices: If you’re working with data that originates from different timezones, or if your data processing spans multiple timezones, it’s generally safer to use TIMESTAMP WITH TIMEZONE to avoid ambiguity. However, if you’re certain that all timestamp data will be interpreted in a consistent timezone context, then TIMESTAMP WITHOUT TIMEZONE can be simpler and more efficient.

Feature 5: Lateral Column reference in SQL SELECT List

The Lateral Column reference in SQL SELECT List is a new feature in Spark 3.4 that allows you to reference columns from a subquery in the SELECT list of a main query. This is useful for a variety of tasks, such as joining data from multiple tables or performing aggregations on related data.

Earlier the only way to reference columns from a subquery in the SELECT list was to use a correlated subquery. Correlated subqueries can be difficult to write and understand, and they can also lead to performance problems.

The Lateral Column reference solves these problems by creating a new virtual table for each row in the main query, and then using that virtual table to evaluate the subquery. This makes it easier to write and understand queries that use lateral column references, and it also improves the performance of these queries.

To use the Lateral Column reference, you need to use the LATERAL keyword followed by the subquery. For example, the following query uses the Lateral Column reference to join data from two tables:

SELECT
  customer.name,
  order.total_price
FROM
  customers
LATERAL VIEW
  orders AS order
WHERE
  order.customer_id = customer.id

This query first creates a virtual table for each row in the customers table. The orders subquery is then evaluated for each row in the virtual table, and the results of the subquery are joined to the main query.

Lateral Column reference in SQL SELECT List can be very useful, such as:

  1. Simplification of Queries: It simplifies the SQL queries by allowing you to use column aliases or expressions directly within the same SELECT clause. This can make the query more readable and concise.
  2. Avoids Subqueries: In previous versions, to achieve similar functionality, you might have to use subqueries or CTEs (Common Table Expressions). With lateral column references, you can avoid those, leading to potentially more optimized query execution.
  3. Enhanced Expressiveness: It provides more expressiveness in SQL, allowing for more complex computations and transformations within a single SELECT statement.Used to explore data in a more flexible way. This can be useful for identifying trends or patterns in data or for debugging queries.
  4. Optimized Performance: As the need for subqueries or additional layers of query nesting is reduced, there might be potential performance benefits due to more straightforward query execution.
  5. Consistency with Other Databases: Many relational databases support this feature. By adding this to Spark, it brings consistency and makes it easier for developers transitioning from other databases.
  6. Used in subqueries to refer to columns from the outer query. This can be useful for nested queries or for queries that need to access data from multiple tables.

Feature 6: Bloom Filter Join (ON)

Bloom filter join is a new feature in Spark 3.4 that can be used to improve the performance of joins between large datasets.

The traditional joins were very expensive, especially when the datasets were large and the join keys were not evenly distributed. This is because the join algorithm has to scan the entire dataset on both sides of the join, even if the join keys are not present in the other dataset.

This problem is solved by using a Bloom filter to estimate whether a row in one dataset is also present in the other dataset. A Bloom filter is a space-efficient probabilistic data structure that can be used to test whether an element is a member of a set.

To use Bloom filter join, you need to create a Bloom filter for each side of the join. The Bloom filter for the build side is created by sampling a subset of the rows in the dataset and adding their join keys to the Bloom filter. The Bloom filter for the probe side is then used to test whether the join keys on the probe side are also present on the build side.

If the Bloom filter for the probe side indicates that a join key is present on the build side, then the row is only actually joined if it is actually present on the build side. This can significantly reduce the number of rows that need to be joined, which can improve the performance of the join.

Here is an example of how to use Bloom filter join in Spark:

# Create the Bloom filters for the build and probe sides.
build_bloom_filter = create_bloom_filter(build_data)
probe_bloom_filter = create_bloom_filter(probe_data)

# Join the two datasets using the Bloom filters.
joined_data = spark.sql(
    """
    SELECT *
    FROM build_data
    JOIN probe_data
    ON build_bloom_filter.contains(probe_data.join_key)
    """
)

This code first creates the Bloom filters for the build and probe sides. It then joins the two datasets using the Bloom filters. The contains() method on the Bloom filter returns true if the join key is present in the Bloom filter, and false otherwise.

Here are some of the use cases of Bloom Filter Join:

  1. Bloom Filter Join can be used to join large datasets to detect fraudulent transactions in real-time. This can be used to detect fraudulent transactions in credit card transactions, insurance claims, or other financial transactions.
  2. Bloom Filter Join can be used to join large datasets to recommend products or services to users in real-time. This can be used to recommend products to customers on e-commerce websites or to recommend movies or TV shows to users on streaming platforms.
  3. Bloom Filter Join can be used to join large datasets to detect anomalies in data in real-time. This can be used to detect anomalies in sensor data, financial data, or other types of data.
  4. Bloom Filter Join can be used to join large datasets to deduplicate data in real-time. This can be used to deduplicate customer records, product records, or other types of records.
  5. Bloom Filter Join can be used to join large datasets to integrate data from different sources in real-time. This can be used to integrate data from different systems or applications, such as customer relationship management (CRM) systems and enterprise resource planning (ERP) systems.

The Bloom filter join can be a very effective way to improve the performance of joins between large datasets. However, it is important to note that Bloom filters are probabilistic data structures, so there is a small chance that a row that is present in one dataset will not be detected by the Bloom filter on the other dataset.

Feature 7: Easily convert the Entire Source Dataframe to a Schema with Dataset.to(StructType)

Earlier, to convert an entire DataFrame to a schema, you have to use the createDataFrame() method with a schema. But, this was tedious and error-prone, especially if the DataFrame had a large number of columns.

Spark 3.4 has introduced Dataset.to(StructType) feature that solves the problem by making it easy to convert an entire DataFrame to a schema with a single line of code.

The method Dataset.to(StructType) is employed to transform an entire source dataframe into a particular schema. This function operates much like adding data to a table, where the input information is adjusted to fit the table’s structure. However, this method goes further by also handling inner fields within the data. This process encompasses the following:

  1. Rearranging columns and inner fields to align with the designated schema.
  2. Removing columns and inner fields that are unnecessary according to the specified schema.
  3. Changing the data type of columns and inner fields to match the anticipated types.

For example, the following code converts a DataFrame with three columns to a schema with the same column names and types:

df.to(StructType([ 
  StructField("name", StringType(), True), 
  StructField("age", IntegerType(), True), 
  StructField("gender", StringType(), True) 
]))

The to(StructType) method takes a single argument, which is a list of StructFields. Each StructField specifies the name, type, and nullability of a column in the schema.

The Dataset.to(StructType) feature is a useful addition to Spark 3.4. It makes it easy to convert an entire DataFrame to a schema, which can be useful for a variety of tasks.

Here is an example of how to use the Dataset.to(StructType) feature in Python:

import pyspark.sql.functions as F

df = spark.createDataFrame([('John Doe', 30, 'M'), ('Alice', 25, 'F')],
                          schema=['name', 'age', 'gender'])

# Convert the DataFrame to a schema with the same column names and types.
schema = df.to(StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("gender", StringType(), True)
]))

# Print the schema.
print(schema)

This code first creates a DataFrame with three columns. It then converts the DataFrame to a schema with the same column names and types. Finally, it prints the schema to the console.

The use cases of Dataset.to(StructType) are:

  1. Schema Evolution: When dealing with big data, the schema of the data can evolve over time. You might have new columns added or some columns removed. In such cases, you can use Dataset.to(StructType) to enforce a specific schema on your Dataset. This can be especially useful when reading data from sources that might have inconsistent schemas.
  2. Data Cleaning: If you have a Dataset where some columns might have inconsistent data types (e.g., a column that should be an integer but sometimes has strings), you can use this method to enforce a consistent schema and handle or remove rows that don’t match the schema.
  3. Optimizing Queries: Sometimes, you might want to optimize your Spark queries by changing the schema of your Dataset. By enforcing a specific schema using Dataset.to(StructType), you can ensure that your Dataset has the optimal schema for your queries.
  4. Integration with External Systems: When integrating Spark with external systems (e.g., databases, data warehouses), you might need to ensure that the Dataset you’re writing to the external system matches a specific schema. In such cases, you can use this method to convert your Dataset to the required schema before writing it to the external system.
  5. Type Safety: One of the advantages of using Datasets in Spark is that they provide compile-time type safety. By enforcing a specific schema on your Dataset, you can ensure that your Spark code is type-safe and avoid runtime errors related to schema mismatches.
  6. Migrating from DataFrames to Datasets: If you’re migrating your Spark code from using DataFrames to Datasets, you might need to enforce a specific schema on your DataFrames to convert them to Datasets. In such cases, Dataset.to(StructType) can be used to enforce the required schema and convert the DataFrame to a Dataset.

Feature 8: Ability to Construct Parameterized SQL Queries

A parameterized SQL query is a query that uses named parameters instead of literal values. This means that the values of the parameters are not hard-coded into the query, but are instead passed in at runtime. This makes the query more reusable because the same query can be used with different values for the parameters. It also makes the query more secure, because it prevents attackers from injecting malicious code into the query.

To create a parameterized SQL query, you use the sql() method on the SparkSession object. The sql() method takes two arguments: the SQL text of the query, and a map of parameters. The keys of the map are the names of the parameters, and the values of the map are the values of the parameters.

For example, the following code creates a parameterized SQL query that selects all rows from the tbl table where the date column is greater than a certain date:

spark.sql( 
  sqlText = "SELECT * FROM tbl WHERE date > :startDate LIMIT :maxRows", 
  args = Map( "startDate" -> LocalDate.of(2022, 12, 1), "maxRows" -> 100))

In this example, the startDate and maxRows parameters are used to control the filtering and limiting of the query results. The values of these parameters are passed in at runtime, so the query can be reused with different values for these parameters.

Below are some of the situations where you can use the parameterized SQL Queries:

  1. Dynamic Data Filtering: If you have a Spark DataFrame and you want to filter it based on some user input or another dynamic source, parameterized SQL can be useful. Instead of string concatenation, which can be error-prone and insecure, you can use parameterized queries to safely insert values into your SQL statement.
  2. Preventing SQL Injection: One of the primary reasons to use parameterized SQL queries is to prevent SQL injection attacks. By using parameterized queries, you ensure that user input is always treated as data and not executable code.
  3. Reusable Code: If you have a piece of code that needs to run similar SQL queries with different values, parameterized queries can make your code more modular and reusable. You can define the query once and execute it with different parameters as needed.
  4. Performance Optimization: In some cases, using parameterized queries can lead to performance benefits. The database can cache the execution plan for a parameterized query, which can lead to faster execution times when the same query is run multiple times with different parameters.
  5. Complex Queries with Multiple Conditions: If you’re constructing a complex SQL query with multiple conditions based on various inputs, using parameterized queries can make the code cleaner and more readable.
  6. Integration with External Systems: If you’re integrating Spark with external systems that provide data, using parameterized queries can ensure that the data is inserted into your SQL statements safely and correctly.
  7. Data Validation: When using parameterized queries, the data type of the input is often checked, ensuring that the data being inserted into the query is of the correct type.

Conclusion

In the ever-evolving realm of big data and analytics, Apache Spark 3.4 emerges as a game-changer, offering a plethora of advanced features that cater to the needs of modern data-driven enterprises. With its latest release, Apache Spark not only addresses previous limitations but also introduces innovative capabilities that promise to redefine data processing and analytics. From the groundbreaking Spark Connect architecture that offers remote connectivity to Spark clusters, to the addition of the TorchDistributor Module in PySpark for efficient distributed training, and the introduction of the TIMESTAMP WITHOUT TIMEZONE Data Type for more precise timestamp handling, Spark 3.4 is set to revolutionize the way businesses harness the power of their data. Furthermore, the support for DEFAULT values in tables and the Lateral Column reference in SQL SELECT List further enhances the flexibility and efficiency of data operations. As organizations continue to rely heavily on data-driven insights for decision-making, embracing the advancements of Apache Spark 3.4 will undoubtedly pave the way for more streamlined, efficient, and insightful data analytics processes.

+ There are no comments

Add yours

Leave a Reply