Maximizing Collaboration and Productivity: Azure DevOps and Databricks Pipelines


Data is the backbone of modern businesses, and processing it efficiently is critical for success. However, as data projects grow in complexity, managing code changes and deployments becomes increasingly difficult. That’s where Continuous Integration and Continuous Delivery (CI/CD) come in. By automating the code deployment process, you can streamline your data pipelines, reduce errors, and improve efficiency. If you’re using Azure DevOps to implement CI/CD on Azure Databricks, you’re in the right place. In this blog, we’ll show you how to set up CI/CD on Azure Databricks using Azure DevOps to improve efficiency, maximize collaboration and productivity, and unlock your team’s full potential to produce better results. Let’s get started!

Table of contents

CI/CD with Databricks and Azure DevOps

An Azure Databricks pipeline typically consists of the following steps, although the exact configuration may vary depending on your requirements:

Continuous integration:

  • Develop code and unit tests using an external IDE or in an Azure Databricks notebook.
  • Manually run tests.
  • Commit code and tests to a git branch.
  • Gather new and updated code and tests.
  • Run automated tests.
  • Build libraries and non-notebook Apache Spark code.
  • Generate a release artifact.

Continuous delivery:

  • Deploy notebooks and libraries.
  • Run automated tests and report results.
  • Programmatically schedule data engineering, analytics, and machine learning workflows for operation.

In this blog, we will primarily focus on Build pipeline and there will be another blog for release pipeline

Develop and Commit your code

When designing a CI/CD pipeline in Azure Databricks, choosing a code commit and branching strategy that won’t impact production code is important. Azure Databricks supports integrations with various Git providers, but in case you need more control, you can use the Databricks CLI to export notebooks and commit them from your local machine. We can also create a CLI script that could be run within a local git repository to sync with the remote repository. This is exactly what the below code does and you can utilize this code as it is.

#Check out the desired branch.	
git checkout <branch>
#Pull new changes from the remote branch.	
git pull
#Export code and notebooks from the Azure  Databricks workspace using the Azure Databricks workspace CLI.	
databricks workspace export_dir --profile <profile> -o <path> ./Workspace

#Prompt the user for a commit message or use the default if one is not provided.	
dt=`date '+%Y-%m-%d %H:%M:%S'` msg_default="DB export on $dt" read -p "Enter the commit comment [$msg_default]: " msg msg=${msg:-$msg_default} echo $msg

#Commit the updated code and notebooks to the local branch.	
git add . git commit -m "<commit-message>"

#Push the changes to the remote branch.	
git push

If you prefer to develop code in a VSCode or IntelliJ rather than in Azure Databricks notebooks, you can use the GitHub integration features built into these IDEs, or else you can use the above script to check in the code.

Now below steps will be used to create Azure DevOps Pipeline

Deploy Azure VM

We will Deploy an Azure virtual machine for the build pipeline. The virtual machine image should match the one on the Azure Databricks cluster as closely as possible. For example, Databricks Runtime 10.4 LTS runs Ubuntu 20.04.4 LTS, which maps to the Ubuntu 20.04 virtual machine image in the Azure Pipeline agent pool. Here is the latest link to Databricks releases which exactly shows the OS. For example, runtime 13.0 works on Ubuntu 22.04.2 LTS as per the release 13 release notes. So always make sure you use the OS version supported by the release.

# Specify the trigger event to start the build pipeline.
# In this case, new code merged into the release branch initiates a new build.
trigger:
- release

# Specify the operating system for the agent that runs on the Azure virtual
# machine for the build pipeline (known as the build agent). The virtual
# machine image should match the one on the Azure Databricks cluster as
# closely as possible. For example, Databricks Runtime 10.4 LTS runs
# Ubuntu 20.04.4 LTS, which maps to the Ubuntu 20.04 virtual machine
# image in the Azure Pipeline agent pool. See
# https://learn.microsoft.com/azure/devops/pipelines/agents/hosted#software
pool:
  vmImage: ubuntu-20.04

Install the required Python version and build tools

Now we have the VM we need to deploy the required version of Python and build tools on the virtual machine for testing and packaging the Python code. Make sure that the correct version of Python matches the version installed on your remote Azure Databricks cluster.

# Install Python. The version of Python must match the version on the
# Azure Databricks cluster. This pipeline assumes that you are using
# Databricks Runtime 10.4 LTS on the cluster.
steps:
- task: UsePythonVersion@0
  displayName: 'Use Python 3.8'
  inputs:
    versionSpec: 3.8

# Install required Python modules and their dependencies. These
# include pytest, which is needed to run unit tests on a cluster,
# and setuptools, which is needed to create a Python wheel. Also
# install the version of Databricks Connect that is compatible
# with Databricks Runtime 10.4 LTS on the cluster.
- script: |
    pip install pytest requests setuptools wheel
    pip install -U databricks-connect==10.4.*
  displayName: 'Load Python dependencies'

Now let’s understand what are these libraries and why we need to install them:

  • pytest is a Python testing framework that allows you to write and run automated tests for your Python code. It provides various features such as test discovery, fixtures, parametrization, and plugins that make it easier to write and manage tests.
  • requests is a popular Python library for making HTTP requests. It provides a simple and elegant way to interact with APIs and websites, allowing you to send HTTP requests and handle responses in a Pythonic way.
  • Setuptools is a package that allows you to easily package your Python code into a distributable format. It provides a setup.py script that allows you to define the metadata about your project such as name, version, author, and dependencies. It also allows you to specify what files should be included in the distribution, such as source code, documentation, and test files. Setuptools can create source distributions, which contain the original source code, and binary distributions, which contain compiled code that can be installed on the user’s machine.
  • Wheel is a format for distributing pre-built Python packages. It allows for faster installation of packages because the user doesn’t have to compile the code on their machine. Wheels are essentially zip files that contain the compiled code along with metadata that describes the package. They can be installed using pip, the standard Python package manager.

Install and configure Databricks connect

Now after installing the required version of the Python and build tool, Azure DevOps will have to connect to the Databricks cluster via Databricks CLI so it can deploy the code. Databricks CLI needs these parameters

Parameter Name used by Databricks Connect What does it means?
DATABRICKS_ADDRESSIt is this part in the databricks workspace URL https://adb-XXXXX.azuredatabricks.net
DATABRICKS_API_TOKENWhat does it mean?
DATABRICKS_CLUSTER_ID,Code can be deployed to any cluster so this is a unique cluster id in the cluster URL: https://adb-xxx.azuredatabricks.net/?o=xxx#setting/clusters/XXX/configuration
DATABRICKS_ORG_ID,Organization id in the Databricks URL: https://adb-xxx.azuredatabricks.net/?o=XXX#
DATABRICKS_PORTPort number used to connect to Databricks it is by default 15001

To make these values configurable we have used these values as variables so they can be changed. Now the next question comes from where we will collect these values.The following picture depicts it.

Now we will generate the token:

Set the value of DATABRICKS_PORT variable to 15001

# Use environment variables to pass Azure Databricks workspace and cluster
# information to the Databricks Connect configuration function.
- script: |
    echo "y
    $(DATABRICKS_ADDRESS)
    $(DATABRICKS_API_TOKEN)
    $(DATABRICKS_CLUSTER_ID)
    $(DATABRICKS_ORG_ID)
    $(DATABRICKS_PORT)" | databricks-connect configure
  displayName: 'Configure Databricks Connect'

Copy the files from Git Repo and Run Unit Tests

In this step, we will copy the file from the Git repository to the virtual machine.

# Download the files from the designated branch in the Git remote repository
# onto the build agent.
- checkout: self
  persistCredentials: true
  clean: true

In this step, we will run unit tests on the Python code and publish the test results.

# For library code developed outside of an Azure Databricks notebook, the
# process is like traditional software development practices. You write a
# unit test using a testing framework, such as the Python pytest module, and
# you use JUnit-formatted XML files to store the test results.
- script: |
    python -m pytest --junit-xml=$(Build.Repository.LocalPath)/logs/TEST-LOCAL.xml $(Build.Repository.LocalPath)/libraries/python/dbxdemo/test*.py || true
  displayName: 'Run Python unit tests for library code'

Let’s understand the pytest command

  • Switch --junit-xml: specifies that the test results should be output in JUnit XML format.
  • $(Build.Repository.LocalPath)/logs/TEST-LOCAL.xml: specifies the path where the JUnit XML output file should be saved. This path is specified using a variable that references the local path of the repository where the code is stored.
  • $(Build.Repository.LocalPath)/libraries/python/dbxdemo/test*.py: specifies the path and file pattern for the test files to be executed by pytest. In this case, it is looking for all files in the “test” directory that have a “.py” file extension and that match the pattern “test*.py”.

Finally, the command has the || true statement at the end. This ensures that even if the pytest command fails (e.g. if any of the tests fail), the script will continue to execute without failing the entire build process. The || true statement essentially tells the script to continue execution even if the pytest command returns a non-zero exit code.

Publish the Test Result

This Azure DevOps Task will publish the test results.

# Publishes the test results to Azure DevOps. This lets you visualize
# reports and dashboards related to the status of the build process.
- task: PublishTestResults@2
  inputs:
    testResultsFiles: '**/TEST-*.xml'
    failTaskOnFailedTests: true
    publishRunAttachments: true

Create Python Wheels

Now we will package the Python code into the wheel and this task exactly does the same.

# Package the example Python code into a Python wheel.
- script: |
    cd $(Build.Repository.LocalPath)/libraries/python/dbxdemo
    python3 setup.py sdist bdist_wheel
    ls dist/
  displayName: 'Build Python Wheel for Libs'

Let’s understand the code:

python3 setup.py sdist: This command builds a source distribution (“.tar.gz” file) of the Python package based on the information and settings specified in the “setup.py” file. The source distribution file is intended to be used by developers who want to install and work with the package’s source code directly.

bdist_wheel: This command builds a binary distribution (“.whl” file) of the Python package, which is a pre-compiled package distribution that can be easily installed on various systems. The wheel format is a popular choice for distributing Python packages because it includes pre-built binary files, making it faster and easier to install packages on different platforms.

By running these two commands, the script creates both a source distribution and a binary distribution of the Python package, which can be uploaded to a package repository or used for deployment purposes.

Generate the Deployment Artifacts

This task will generate the deployment artifacts and save them in the build directory.

# Generate the deployment artifacts. To do this, the build agent gathers
# all the new or updated code to be deployed to the Azure Databricks
# environment, including the sample Python notebook, the Python wheel
# library that was generated by the build process, related release settings
# files, and the result summary of the tests for archiving purposes.
# Use git diff to flag files that were added in the most recent Git merge.
# Then add the Python wheel file that you just created along with utility
# scripts used by the release pipeline.
# The implementation in your pipeline will likely be different.
# The objective here is to add all files intended for the current release.
- script: |
    git diff --name-only --diff-filter=AMR HEAD^1 HEAD | xargs -I '{}' cp --parents -r '{}' $(Build.BinariesDirectory)
    mkdir -p $(Build.BinariesDirectory)/libraries/python/libs
    cp $(Build.Repository.LocalPath)/libraries/python/dbxdemo/dist/*.* $(Build.BinariesDirectory)/libraries/python/libs
    mkdir -p $(Build.BinariesDirectory)/cicd-scripts
    cp $(Build.Repository.LocalPath)/cicd-scripts/*.* $(Build.BinariesDirectory)/cicd-scripts
    mkdir -p $(Build.BinariesDirectory)/notebooks
    cp $(Build.Repository.LocalPath)/notebooks/*.* $(Build.BinariesDirectory)/notebooks
  displayName: 'Get Changes'

Let’s understand the command

  1. git diff --name-only --diff-filter=AMR HEAD^1 HEAD: This command compares the changes between the current commit (HEAD) and the previous commit (HEAD^1), and returns the names of the files that were added (A), modified (M), or renamed (R). The --name-only option ensures that only the names of the changed files are returned, and the --diff-filter option specifies which types of changes to include (in this case, only added, modified, or renamed files).
  2. xargs -I '{}' cp --parents -r '{}' $(Build.BinariesDirectory): This command takes the list of changed file names from the previous command and passes each one as an argument to the cp command. The cp command then copies each file to a specified directory ($(Build.BinariesDirectory)). The --parents option ensures that the directory structure of each file is preserved, and the -r option enables recursive copying for directories. xargs is a powerful command-line utility that can be used to build and execute commands from standard input. It is particularly useful when you have a long list of items that you want to pass as arguments to a command, but the command line has a maximum length.
  3. mkdir -p $(Build.BinariesDirectory)/libraries/python/libs: This command creates a directory in the build artifacts directory ($(Build.BinariesDirectory)) for storing Python libraries.
  4. cp $(Build.Repository.LocalPath)/libraries/python/dbxdemo/dist/*.* (Build.BinariesDirectory)/libraries/python/libs: This command copies all files in the “dist” directory of the Python package ($(Build.Repository.LocalPath)/libraries/python/dbxdemo/dist/*.*) to the Python libraries directory in the build artifacts directory ($(Build.BinariesDirectory)/libraries/python/libs).
  5. mkdir -p $(Build.BinariesDirectory)/cicd-scripts: This command creates a directory in the build artifacts directory for storing CI/CD scripts.
  6. cp $(Build.Repository.LocalPath)/cicd-scripts/*.* $(Build.BinariesDirectory)/cicd-scripts: This command copies all files in the “cicd-scripts” directory ($(Build.Repository.LocalPath)/cicd-scripts/*.*) to the CI/CD scripts directory in the build artifacts directory ($(Build.BinariesDirectory)/cicd-scripts).
  7. mkdir -p $(Build.BinariesDirectory)/notebooks: This command creates a directory in the build artifacts directory for storing notebooks.
  8. cp $(Build.Repository.LocalPath)/notebooks/*.* $(Build.BinariesDirectory)/notebooks: This command copies all files in the “notebooks” directory ($(Build.Repository.LocalPath)/notebooks/*.*) to the notebooks directory in the build artifacts directory ($(Build.BinariesDirectory)/notebooks).

Publish the Artifacts to the Artifact Repository

This task will create the deployment artifacts so they can be used in the release pipelines.

# Create the deployment artifact and then publish it to the
# artifact repository.
- task: ArchiveFiles@2
  inputs:
    rootFolderOrFile: '$(Build.BinariesDirectory)'
    includeRootFolder: false
    archiveType: 'zip'
    archiveFile: '$(Build.ArtifactStagingDirectory)/$(Build.BuildId).zip'
    replaceExistingArchive: true

- task: PublishBuildArtifacts@1
  inputs:
    ArtifactName: 'DatabricksBuild'

Here the build id is the AzureDevOps Build ID which is generated from the build process. When you build the pipeline and run it, it will show all the tasks completed something like this:

So far we completed the build process in Azure DevOps but now we need to understand the helper files and code files used in the sample here.

Reusable Utility files, Unit Testing and DevOps Structure

In this section, we will create the utility files and the sample files used in the unit testing. We will also discuss the placement of the files in the DevOps Directory.

Add the unit test source files to the Repository

To enable the build agent to run the unit tests, add the following three files addcol.pytest-addcol.py, and __init__.py as shown. These files will be placed within a folder path libraries/python/dbxdemo at the root of your remote Git repository. Your code could be different than just adding the column in the Dataframe discussed here.

addcol.py Python file

We are adding the column in the data frame and here is the code for the same:

# addcol.py
import pyspark.sql.functions as F

def with_status(df):
  return df.withColumn("status", F.lit("checked"))

This code defines a function called with_status that takes a PySpark DataFrame (df) as its argument. The function returns a new DataFrame that is the same as the input DataFrame, with an additional column called “status” added to it.

The “status” column is created using the withColumn method of the input DataFrame df. This method allows us to add a new column to the DataFrame based on an expression or value. In this case, the value being added is the string “checked”, which is created using the F.lit() function provided by PySpark. The F.lit() function is used to create a literal value that can be used as a column expression.

So, the overall effect of this code is to add a new column called “status” to a PySpark DataFrame with the value “checked” in every row. This can be useful, for example, when we want to keep track of the status of each row in a DataFrame.

test-addcol.py Python File

This code tests the add column method. This is an example so you can tweak it based on your need.

# test-addcol.py
import pytest

from pyspark.sql import SparkSession
from .addcol import with_status

@pytest.fixture
def spark() -> SparkSession:
  return SparkSession.builder.getOrCreate()

def test_with_status(spark):
  source_data = [
    ("pete", "pan", "peter.pan@databricks.com"),
    ("jason", "argonaut", "jason.argonaut@databricks.com")
  ]
  source_df = spark.createDataFrame(
    source_data,
    ["first_name", "last_name", "email"]
  )

  actual_df = with_status(source_df)

  expected_data = [
    ("pete", "pan", "peter.pan@databricks.com", "checked"),
    ("jason", "argonaut", "jason.argonaut@databricks.com", "checked")
  ]

  expected_df = spark.createDataFrame(
    expected_data,
    ["first_name", "last_name", "email", "status"]
  )

  assert(expected_df.collect() == actual_df.collect())

Let’s understand this code: This code defines and runs a unit test for the with_status function defined in a separate module called addcol.py. The purpose of this test is to ensure that the with_status function is working correctly and producing the expected output.

The test uses the pytest library, which provides a framework for writing and executing unit tests in Python. The test itself is defined as a function called test_with_status, which takes a SparkSession object as its argument (created using a @pytest.fixture decorator).

Inside the test_with_status function, the code first creates a test DataFrame (source_df) using a list of tuples that contains some sample data. It then calls the with_status function on this DataFrame to create a new DataFrame with an additional “status” column. The expected output is then defined as another DataFrame (expected_df) that contains the same data as the original DataFrame, but with the “status” column added.

The final line of the test uses an assert statement to compare the output of the with_status function (the actual_df DataFrame) to the expected output (the expected_df DataFrame). Specifically, it checks whether the two DataFrames have the same contents by comparing the results of calling the collect method on each of them.

If the test passes, this means that the with_status function is working correctly and producing the expected output. If the test fails, it indicates that there is a problem with the with_status function that needs to be fixed.

Empty _init_.py Python file

The third file, __init__.py, must be blank and must also exist in the libraries/python/dbxdemo folder path. This file enables the test-addcol.py file to load the addcol.py file as a library

Now let’s understand the purpose of __init__.py file and how it works in Python: In Python, an __init__.py file is a blank file that is typically included in a directory to indicate that it is a Python package. When Python imports a package, it first looks for the __init__.py file in the package directory, and if it exists, it knows that the directory is a Python package and should be treated as such.

The __init__.py file can contain code that is executed when the package is imported. For example, you might use it to define global variables or import modules that are needed by the package.

In some cases, the __init__.py file might be left blank because there is no need for any initialization code or additional modules to be imported. However, even if the file is empty, it is still important to include it in the package directory to indicate to Python that the directory is a package.

So, the __init__.py file is used in Python to indicate that a directory is a package, and can contain initialization code or import statements for the package.

Please make sure to copy the Python libraries to the specific folders (as depicted in the diagram below) in GitHub because the Azure DevOps Code will pick it up from that location.

Add setup.py wheel packaging script to Repo

To enable the build agent to use Python Setuptools to package the Python wheel, add a minimal version of the following setup.py file to the libraries/python/dbxdemo folder path in your remote Git repository:

# setup.py
from setuptools import setup, find_packages

setup(
  name = 'dbxdemo',
  version = '0.1.0',
  packages = ['.']
)

Let’s understand why we need this file. This Python script defines the configuration for the installation of a Python package using setuptools, which is a package management library in Python. The script is called setup.py and must be placed in the root directory of the package.

The setup function is the main entry point of the script, and it takes a number of arguments that define the package’s metadata and dependencies. Here’s a breakdown of the arguments:

  • name: The name of the package. In this case, the package is named “dbxdemo”.
  • version: The version number of the package. In this case, the package version is “0.1.0”.
  • packages: A list of the packages that should be included in the distribution. In this case, the package will include all packages found in the current directory (find_packages() function is called with a default parameter of '.').

Other arguments can be added to specify the package’s dependencies, author, license, and more. For example, install_requires can be used to specify the package’s dependencies.

Once the setup.py file is created, the package can be installed by running the following command in the package’s root directory:

pip install .

This will use the setup.py script to build and install the package, making it available for use in Python.

Add Python Notebook to Repo

To enable the build agent to give the sample Python notebook to the release pipeline, add the following dbxdemo-notebook.py file to a notebooks folder in the root of your associated remote Git repository. There can be multiple notebooks in the notebooks folder. For the sake of simplicity we have taken a simple file.

# Databricks notebook source
import sys
sys.path.append("/databricks/python3/lib/python3.8/site-packages")

# COMMAND ----------

import unittest
from addcol import *

class TestNotebook(unittest.TestCase):

  def test_with_status(self):
    source_data = [
      ("pete", "pan", "peter.pan@databricks.com"),
      ("jason", "argonaut", "jason.argonaut@databricks.com")
    ]

    source_df = spark.createDataFrame(
      source_data,
      ["first_name", "last_name", "email"]
    )

    actual_df = with_status(source_df)

    expected_data = [
      ("pete", "pan", "peter.pan@databricks.com", "checked"),
      ("jason", "argonaut", "jason.argonaut@databricks.com", "checked")
    ]

    expected_df = spark.createDataFrame(
      expected_data,
      ["first_name", "last_name", "email", "status"]
    )

    self.assertEqual(expected_df.collect(), actual_df.collect())

unittest.main(argv = [''], verbosity = 2, exit = False)

Let’s understand what does this code do?

This is Databricks notebook defines a unit test for a Python module called addcol that adds a new column to a PySpark DataFrame. The purpose of the unit test is to verify that the with_status() function in the addcol module correctly adds a new column called “status” with the value “checked” to the input DataFrame.

Here’s a breakdown of the code:

  • The first few lines add the path of the PySpark library to the system path so that it can be imported in the notebook.
  • The unittest module is imported to define and run unit tests in Python.
  • The TestNotebook class is defined, which inherits from unittest.TestCase and defines the unit test cases.
  • The test_with_status method defines a test case that creates a PySpark DataFrame from a list of data, calls the with_status() function to add a new “status” column with the value “checked”, and compares the resulting DataFrame with an expected DataFrame.
  • The unittest.main() function is called with some arguments to run the test cases and print the results to the console.

When the notebook is executed, the unittest module will run the test cases defined in the TestNotebook class, and report whether they pass or fail. If the test passes, it means that the with_status() function in the addcol module is working as expected.

CICD Scripts

There are three Python scripts being used here and the same can be reused in case you want to implement the Azure DevOps with Databricks. These scripts are kept in the cicd-scripts folder in the root directory of your GitHub as shown in the picture below.

Let’s understand how these three scripts work.

installWhlLibrary.py Python file

This script installs the Python wheel library to Databricks.

# installWhlLibrary.py
#!/usr/bin/python3
import json
import requests
import sys
import getopt
import time
import os

def main():
  shard = ''
  token = ''
  clusterid = ''
  libspath = ''
  dbfspath = ''

  try:
    opts, args = getopt.getopt(sys.argv[1:], 'hstcld',
      ['shard=', 'token=', 'clusterid=', 'libs=', 'dbfspath='])
  except getopt.GetoptError:
    print(
      'installWhlLibrary.py -s <shard> -t <token> -c <clusterid> -l <libs> -d <dbfspath>')
    sys.exit(2)

  for opt, arg in opts:
    if opt == '-h':
      print(
        'installWhlLibrary.py -s <shard> -t <token> -c <clusterid> -l <libs> -d <dbfspath>')
      sys.exit()
    elif opt in ('-s', '--shard'):
      shard = arg
    elif opt in ('-t', '--token'):
      token = arg
    elif opt in ('-c', '--clusterid'):
      clusterid = arg
    elif opt in ('-l', '--libs'):
      libspath=arg
    elif opt in ('-d', '--dbfspath'):
      dbfspath=arg

  print('-s is ' + shard)
  print('-t is ' + token)
  print('-c is ' + clusterid)
  print('-l is ' + libspath)
  print('-d is ' + dbfspath)

  # Generate the list of files from walking the local path.
  libslist = []
  for path, subdirs, files in os.walk(libspath):
    for name in files:

      name, file_extension = os.path.splitext(name)
      if file_extension.lower() in ['.whl']:
        print('Adding ' + name + file_extension.lower() + ' to the list of .whl files to evaluate.')
        libslist.append(name + file_extension.lower())

  for lib in libslist:
    dbfslib = 'dbfs:' + dbfspath + '/' + lib
    print('Evaluating whether ' + dbfslib + ' must be installed, or uninstalled and reinstalled.')

    if (getLibStatus(shard, token, clusterid, dbfslib)) is not None:
      print(dbfslib + ' status: ' + getLibStatus(shard, token, clusterid, dbfslib))
      if (getLibStatus(shard, token, clusterid, dbfslib)) == "not found":
        print(dbfslib + ' not found. Installing.')
        installLib(shard, token, clusterid, dbfslib)
      else:
        print(dbfslib + ' found. Uninstalling.')
        uninstallLib(shard, token, clusterid, dbfslib)
        print("Restarting cluster: " + clusterid)
        restartCluster(shard, token, clusterid)
        print('Installing ' + dbfslib + '.')
        installLib(shard, token, clusterid, dbfslib)

def uninstallLib(shard, token, clusterid, dbfslib):
  values = {'cluster_id': clusterid, 'libraries': [{'whl': dbfslib}]}
  requests.post(shard + '/api/2.0/libraries/uninstall', data=json.dumps(values), auth=("token", token))

def restartCluster(shard, token, clusterid):
  values = {'cluster_id': clusterid}
  requests.post(shard + '/api/2.0/clusters/restart', data=json.dumps(values), auth=("token", token))

  waiting = True
  p = 0
  while waiting:
    time.sleep(30)
    clusterresp = requests.get(shard + '/api/2.0/clusters/get?cluster_id=' + clusterid,
      auth=("token", token))
    clusterjson = clusterresp.text
    jsonout = json.loads(clusterjson)
    current_state = jsonout['state']
    print(clusterid + " state: " + current_state)
    if current_state in ['TERMINATED', 'RUNNING','INTERNAL_ERROR', 'SKIPPED'] or p >= 10:
      break
      p = p + 1

def installLib(shard, token, clusterid, dbfslib):
  values = {'cluster_id': clusterid, 'libraries': [{'whl': dbfslib}]}
  requests.post(shard + '/api/2.0/libraries/install', data=json.dumps(values), auth=("token", token))

def getLibStatus(shard, token, clusterid, dbfslib):

  resp = requests.get(shard + '/api/2.0/libraries/cluster-status?cluster_id='+ clusterid, auth=("token", token))
  libjson = resp.text
  d = json.loads(libjson)
  if (d.get('library_statuses')):
    statuses = d['library_statuses']

    for status in statuses:
      if (status['library'].get('whl')):
        if (status['library']['whl'] == dbfslib):
          return status['status']
  else:
    # No libraries found.
    return "not found"

if __name__ == '__main__':
  main()

This Python script installs Python .whl libraries on a Databricks cluster. It uses the Databricks REST API to interact with the cluster and performs the following steps:

  1. Parse command line arguments using getopt.
  2. Walk the local file path specified in libspath to generate a list of .whl files to evaluate.
  3. For each library in the list, evaluate whether it needs to be installed, uninstalled and reinstalled, or left as is.
  4. If the library is not found on the cluster, install it using the installLib function.
  5. If the library is found on the cluster, uninstall it using the uninstallLib function, restart the cluster using restartCluster, and then install it using the installLib function.

The getLibStatus function is used to determine whether a library is already installed on the cluster and what its current status is. The main function is the entry point of the script and calls the other functions to perform the library installation.

executenotebook.py Python file

This Python script executes the Databricks Notebook. Let’s understand how this code works.

# executenotebook.py
#!/usr/bin/python3
import json
import requests
import os
import sys
import getopt
import time

def main():
  shard = ''
  token = ''
  clusterid = ''
  localpath = ''
  workspacepath = ''
  outfilepath = ''

  try:
    opts, args = getopt.getopt(sys.argv[1:], 'hs:t:c:lwo',
      ['shard=', 'token=', 'clusterid=', 'localpath=', 'workspacepath=', 'outfilepath='])
  except getopt.GetoptError:
    print(
      'executenotebook.py -s <shard> -t <token>  -c <clusterid> -l <localpath> -w <workspacepath> -o <outfilepath>)')
    sys.exit(2)

  for opt, arg in opts:
    if opt == '-h':
      print(
        'executenotebook.py -s <shard> -t <token> -c <clusterid> -l <localpath> -w <workspacepath> -o <outfilepath>')
      sys.exit()
    elif opt in ('-s', '--shard'):
        shard = arg
    elif opt in ('-t', '--token'):
        token = arg
    elif opt in ('-c', '--clusterid'):
        clusterid = arg
    elif opt in ('-l', '--localpath'):
        localpath = arg
    elif opt in ('-w', '--workspacepath'):
        workspacepath = arg
    elif opt in ('-o', '--outfilepath'):
        outfilepath = arg

  print('-s is ' + shard)
  print('-t is ' + token)
  print('-c is ' + clusterid)
  print('-l is ' + localpath)
  print('-w is ' + workspacepath)
  print('-o is ' + outfilepath)

  # Generate the list of notebooks from walking the local path.
  notebooks = []
  for path, subdirs, files in os.walk(localpath):
    for name in files:
      fullpath = path + '/' + name
      # Remove the localpath to the repo but keep the workspace path.
      fullworkspacepath = workspacepath + path.replace(localpath, '')

      name, file_extension = os.path.splitext(fullpath)
      if file_extension.lower() in ['.scala', '.sql', '.r', '.py']:
        row = [fullpath, fullworkspacepath, 1]
        notebooks.append(row)

  # Run each notebook in the list.
  for notebook in notebooks:
    nameonly = os.path.basename(notebook[0])
    workspacepath = notebook[1]

    name, file_extension = os.path.splitext(nameonly)

    # workspacepath removes the extension, so now add it back.
    fullworkspacepath = workspacepath + '/' + name + file_extension

    print('Running job for: ' + fullworkspacepath)
    values = {'run_name': name, 'existing_cluster_id': clusterid, 'timeout_seconds': 3600, 'notebook_task': {'notebook_path': fullworkspacepath}}

    resp = requests.post(shard + '/api/2.0/jobs/runs/submit',
      data=json.dumps(values), auth=("token", token))
    runjson = resp.text
    print("runjson: " + runjson)
    d = json.loads(runjson)
    runid = d['run_id']

    i=0
    waiting = True
    while waiting:
      time.sleep(10)
      jobresp = requests.get(shard + '/api/2.0/jobs/runs/get?run_id='+str(runid),
        data=json.dumps(values), auth=("token", token))
      jobjson = jobresp.text
      print("jobjson: " + jobjson)
      j = json.loads(jobjson)
      current_state = j['state']['life_cycle_state']
      runid = j['run_id']
      if current_state in ['TERMINATED', 'INTERNAL_ERROR', 'SKIPPED'] or i >= 12:
        break
      i=i+1

    if outfilepath != '':
      file = open(outfilepath + '/' +  str(runid) + '.json', 'w')
      file.write(json.dumps(j))
      file.close()

if __name__ == '__main__':
  main()

The Python script “executenotebook.py” is designed to execute a set of Jupyter notebooks on a Databricks cluster. The script takes command-line arguments to configure the execution, including the Databricks cluster and authentication information, local and workspace paths to the notebooks, and an output file path.

The script first parses the command-line arguments using the getopt module and prints them to the console. Then, it walks the local path to discover the notebooks and generates a list of notebooks to execute.

The script then executes each notebook in the list by submitting a job request to Databricks. The job request specifies the notebook to execute, the cluster to execute on, and a timeout value. The script then waits for the job to complete by polling the Databricks REST API until the job’s state indicates that it has terminated, an internal error occurred, or it was skipped.

If an output file path is specified, the script writes the JSON response from the job request to a file with the run ID in the specified output directory.

Finally, the script defines a main() function that calls all of the above functions in order. The if __name__ == '__main__': block at the end of the script ensures that the main() function is only called if the script is run directly, not if it is imported as a module.

evaluatenotebookruns.py Python file

This Python script evaluates the notebook runs.

# evaluatenotebookruns.py
#!/usr/bin/python3
import io
import xmlrunner
from xmlrunner.extra.xunit_plugin import transform
import unittest
import json
import glob
import os

class TestJobOutput(unittest.TestCase):

  test_output_path = '<path-to-json-logs-on-release-agent>'

  def test_performance(self):
    path = self.test_output_path
    statuses = []

    for filename in glob.glob(os.path.join(path, '*.json')):
      print('Evaluating: ' + filename)
      data = json.load(open(filename))
      duration = data['execution_duration']
      if duration > 100000:
        status = 'FAILED'
      else:
        status = 'SUCCESS'

      statuses.append(status)

    self.assertFalse('FAILED' in statuses)

  def test_job_run(self):
    path = self.test_output_path
    statuses = []

    for filename in glob.glob(os.path.join(path, '*.json')):
      print('Evaluating: ' + filename)
      data = json.load(open(filename))
      status = data['state']['result_state']
      statuses.append(status)

    self.assertFalse('FAILED' in statuses)

if __name__ == '__main__':
  out = io.BytesIO()

  unittest.main(testRunner=xmlrunner.XMLTestRunner(output=out),
    failfast=False, buffer=False, catchbreak=False, exit=False)

  with open('TEST-report.xml', 'wb') as report:
    report.write(transform(out.getvalue()))

The Python script “evaluatenotebookruns.py” contains a unit test class named “TestJobOutput” with two test methods: “test_performance” and “test_job_run”.

The purpose of this script is to evaluate the output of Databricks notebook runs by processing JSON log files located in the specified path (self.test_output_path) and asserting that they meet certain criteria.

In “test_performance” method, each JSON log file is loaded and the execution duration of the notebook is checked. If the duration is greater than 100000 (presumably in milliseconds), the test will fail; otherwise, it will succeed.

In “test_job_run” method, each JSON log file is loaded and the job status is checked. If any job fails, the test will fail; otherwise, it will succeed.

After running all tests, the results are output to an XML file named “TEST-report.xml” using the xmlrunner module. The output is transformed into an XML format that can be read by continuous integration tools such as Azure DevOps or Jenkins.

This script is intended to be run in a continuous integration (CI) environment to ensure that all Databricks notebook runs meet certain criteria and pass specific tests.

Conclusion

At the end of the article, we have demonstrated the successful implementation of Azure DevOps with Databricks, showcasing how you can streamline your Databricks CI/CD workflows. Through the creation of reusable scripts, deployment of Python wheel libraries and notebooks, and the implementation of automated testing, we have shown how Azure DevOps and Databricks Pipelines can improve productivity within your team. And these tools have the potential to transform the way you work and take your projects to the next level.

+ There are no comments

Add yours

Leave a Reply