Skip to content

Commit 003c3b0

Browse files
authored
Create submit_spark_job_to_driver_node_group_cluster.py
Sample code to submit a Spark job to a Dataproc driver node group cluster.
1 parent 4891517 commit 003c3b0

File tree

1 file changed

+85
-0
lines changed

1 file changed

+85
-0
lines changed
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
#!/usr/bin/env python
2+
3+
# Copyright 2025 Google LLC
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
# This sample walks a user through submitting a Spark job to a
18+
# Dataproc driver node group cluster using the Dataproc
19+
# client library.
20+
21+
# Usage:
22+
# python submit_spark_job_to_driver_node_group_cluster.py \
23+
# --project_id <PROJECT_ID> --region <REGION> \
24+
# --cluster_name <CLUSTER_NAME>
25+
26+
# [START dataproc_submit_spark_job_to_driver_node_group_cluster]
27+
28+
import re
29+
30+
from google.cloud import dataproc_v1 as dataproc
31+
from google.cloud import storage
32+
33+
def submit_job(project_id, region, cluster_name):
34+
# Create the job client.
35+
job_client = dataproc.JobControllerClient(
36+
client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
37+
)
38+
39+
driver_scheduling_config = dataproc.DriverSchedulingConfig(
40+
memory_mb=2048, # Example memory in MB
41+
vcores=2, # Example number of vcores
42+
)
43+
44+
# Create the job config. 'main_jar_file_uri' can also be a
45+
# Google Cloud Storage URL.
46+
job = {
47+
"placement": {"cluster_name": cluster_name},
48+
"spark_job": {
49+
"main_class": "org.apache.spark.examples.SparkPi",
50+
"jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
51+
"args": ["1000"],
52+
},
53+
"driver_scheduling_config": driver_scheduling_config
54+
}
55+
56+
operation = job_client.submit_job_as_operation(
57+
request={"project_id": project_id, "region": region, "job": job}
58+
)
59+
response = operation.result()
60+
61+
# Dataproc job output gets saved to the Cloud Storage bucket
62+
# allocated to the job. Use a regex to obtain the bucket and blob info.
63+
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)
64+
65+
output = (
66+
storage.Client()
67+
.get_bucket(matches.group(1))
68+
.blob(f"{matches.group(2)}.000000000")
69+
.download_as_bytes()
70+
.decode("utf-8")
71+
)
72+
73+
print(f"Job finished successfully: {output}")
74+
75+
76+
# [END dataproc_submit_spark_job_to_driver_node_group_cluster]
77+
78+
79+
if __name__ == "__main__":
80+
81+
my_project_id = "your_cluster" # <-- REPLACE THIS
82+
my_region = "us-central1" # <-- REPLACE THIS
83+
my_cluster_name = "your-node-group-cluster" # <-- REPLACE THIS
84+
85+
submit_job(my_project_id, my_region, my_cluster_name)

0 commit comments

Comments
 (0)