VOOZH about

URL: https://dzone.com/articles/execute-spark-applications-on-databricks-using-the

⇱ Execute Spark Applications on Databricks Using the REST API


Related

  1. DZone
  2. Software Design and Architecture
  3. Integration
  4. Execute Spark Applications on Databricks Using the REST API

Execute Spark Applications on Databricks Using the REST API

Let's get your Spark apps up and running on Databricks.

By Mar. 28, 21 · Tutorial
Likes
Comment
Save
12.3K Views

Join the DZone community and get the full member experience.

Join For Free

Introduction

While many of us are habituated to executing Spark applications using the 'spark-submit' command, with the popularity of Databricks, this seemingly easy activity is getting relegated to the background. Databricks has made it very easy to provision Spark-enabled VMs on the two most popular cloud platforms, namely AWS and Azure. A couple of weeks ago, Databricks announced their availability on GCP as well. The beauty of the Databricks platform is that they have made it very easy to become a part of their platform. While Spark application development will continue to have its challenges - depending on the problem being addressed - the Databricks platform has taken out the pain of having to establish and manage your own Spark cluster.

Using Databricks

Once registered on the platform, the Databricks platform allows us to define a cluster of one or more VMs, with configurable RAM and executor specifications. We can also define a cluster that can launch a minimum number of VMs at startup and then scale to a maximum number of VMs as required. After defining the cluster, we have to define jobs and notebooks. Notebooks contain the actual code executed on the cluster. We need to assign notebooks to jobs as the Databricks cluster executes jobs (and not Notebooks). Databricks also allows us to setup the cluster such that it can download additional JARs and/or Python packages during cluster startup. We can also upload and install our own packages (I used a Python wheel).

Recently, I developed some functionality - data reconciliation, data validation and data profiling - using Spark. Initially, we developed the functionality using the local Spark installation and things were fine. While I knew that the design would need to be reworked, we went ahead with the local implementation. Why the design change? The functionality we developed was fronted by a microservice - one for each. As the microservices were going to be deployed using Docker and Kubernetes, we would need to implement a design change for the simple reason that we could not deploy the Spark application on the Docker and Kubernetes setup. We needed to have the Spark application running on a dedicated Spark instance.

To make this happen, we had two options - Apache Livy and Databricks. For implementation flexibility and also to cater to customer infrastructure, we decided to implement both options. In an earlier article (Execute Spark Applications With Apache Livy), I have mentioned how we can execute Spark applications using Apache Livy's REST interface. I have covered the Apache Livy implementation in an earlier article.

Using Databricks Remotely

Similar to what Apache Livy has, Databricks also provides a REST API. As our implementation was in Python, we used the package databricks_api. While the REST API makes it simple to invoke a Spark application available on a Databricks cluster, I realized that all the three services ended up with the same code - the mechanism for setting up and invoking the Databricks API was the same - the names of the jobs and the parameters passed during invocation were different. Hence I wrapped up the common functionality into a helper class.

Helper class

Here is the helper class to interact with Spark applications hosted on Databricks.

Python




xxxxxxxxxx
1
89


1
. . . other imports
2
from databricks_api import DatabricksAPI
3

 
4
class DatabricksRunner:
5
 def __init__(self):
6
 self.databricks = None
7
 self.host_id = None
8
 self.access_token = None
9
 self.cluster_is_running = False
10
 self.cluster_is_defined = False
11
 self.databricks_job_id = None
12

 
13
 def get_job_id(self, job_list, name):
14
 ret_val = None
15
 jobs = job_list.get("jobs", None)
16
 if jobs is not None:
17
 for i in range(len(jobs)):
18
 job_name = jobs[i].get("settings", {}).get("name", None)
19
 if name == job_name:
20
 ret_val = jobs[i]
21
 break
22
 return ret_val
23

 
24
 def connect_to_cluster(self, host_id, access_token, cluster_name, headers):
25
 self.host_id = host_id
26
 self.access_token = access_token
27
 self.databricks = DatabricksAPI(host=host_id, token=access_token)
28

 
29
 clusters = self.databricks.cluster.list_clusters(headers=headers)
30
 self.cluster_is_running = False
31
 self.cluster_is_defined = False
32
 for c in clusters["clusters"]:
33
 if c["cluster_name"] == cluster_name and self.cluster_is_defined == False:
34
 self.cluster_is_defined = True
35
 if c["state"] == "RUNNING":
36
 self.cluster_is_running = True
37
 elif c["stat"] == "TERMINATED":
38
 self.cluster_is_running = False
39
 else:
40
 self.cluster_is_running = False
41

 
42
 if self.cluster_is_defined is False:
43
 return False
44

 
45
 if self.cluster_is_running is False:
46
 return False
47
 return True
48

 
49
 def get_job_id(self, job_name):
50
 if self.cluster_is_running == False:
51
 return None
52

 
53
 job_list = self.databricks.jobs.list_jobs(headers=None)
54
 job_details = self.get_job_id(job_list, job_name)
55
 if job_details is None:
56
 return None
57
 self.databricks_job_id = job_details["job_id"]
58
 return True
59

 
60
 def run_job(self, notebook_params, jar_params, python_params, spark_submit_params, headers):
61
 ret_val = self.databricks.jobs.run_now(job_id=self.databricks_job_id, jar_prams=jar_params,
62
 notebook_params=notebook_params, python_params=python_params,
63
 spark_submit_params=spark_submit_params, headers=headers)
64
 self.job_run_id = ret_val["run_id"]
65
 return True
66

 
67
 def wait_for_job(self, poll_time, timeout_value):
68
 life_cycle_state = "RUNNING"
69
 run_time = 0
70
 result_state = ""
71
 while "TERMINATED" != life_cycle_state and run_time <= timeout_value:
72
 run_response = self.databricks.jobs.get_run(run_id=self.job_run_id, headers=None)
73
 state = run_response.get("state", None)
74
 if state is not None:
75
 life_cycle_state = state.get("life_cycle_state", None)
76
 result_state = state.get("result_state", None)
77
 time.sleep(poll_time)
78
 run_time = run_time + poll_time
79

 
80
 if run_time >= timeout_value:
81
 return "job is still running"
82
 elif "TERMINATED" == life_cycle_state:
83
 if result_state == "success":
84
 ret_val = "success"
85
 elif result_state == "failed":
86
 ret_val = "failed"
87
 else:
88
 ret_val = "state not recognized"
89
 return ret_val



Using the Helper Class

After defining the class, we can run Spark jobs as below

Python




x


1
host_id = # assign value
2
access_token = # assign value
3
cluster_name = # assign value
4
headers = # assign value
5
job_name = # assign value
6
notebook_params = # json
7
python_params = # json
8
spark_submit_params = # json
9

 
10
runner = DatabricksRunner()
11
ret_val = runner.connect_to_cluster(host_id, access_token, cluster_name, headers)
12
ret_val = runner.get_job_id(job_name)
13
ret_val = runner.run_job(notebook_params, jar_params, python_params, spark_submit_params, headers)
14
ret_val = runner.wait_for_job(60, 600)



Conclusion

The Databricks API makes it easy to interact with Databricks jobs remotely. Not only can we run jobs on the Databricks cluster, but we can also monitor their execution state.

API REST Web Protocols application Docker (software)

Opinions expressed by DZone contributors are their own.

Related

  • Breaking Up a Monolithic Database with Kong
  • Building REST API Backend Easily With Ballerina Language
  • Aggregating REST APIs Calls Using Apache Camel
  • GraphQL vs REST API: Which Is Better for Your Project in 2025?

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

Let's be friends: