Skip to content

Commit 73875fc

Browse files
authored
Update submit_spark_job_to_driver_node_group_cluster.py
1 parent 003c3b0 commit 73875fc

File tree

1 file changed

+44
-18
lines changed

1 file changed

+44
-18
lines changed

dataproc/snippets/submit_spark_job_to_driver_node_group_cluster.py

Lines changed: 44 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,18 @@
3030
from google.cloud import dataproc_v1 as dataproc
3131
from google.cloud import storage
3232

33-
def submit_job(project_id, region, cluster_name):
33+
def submit_job(project_id: str, region: str, cluster_name: str) -> None:
34+
"""Submits a Spark job to the specified Dataproc cluster with a driver node group and prints the output.
35+
36+
Args:
37+
project_id: The Google Cloud project ID.
38+
region: The Dataproc region where the cluster is located.
39+
cluster_name: The name of the Dataproc cluster.
40+
"""
3441
# Create the job client.
35-
job_client = dataproc.JobControllerClient(
42+
with dataproc.JobControllerClient(
3643
client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
37-
)
44+
) as job_client:
3845

3946
driver_scheduling_config = dataproc.DriverSchedulingConfig(
4047
memory_mb=2048, # Example memory in MB
@@ -56,19 +63,33 @@ def submit_job(project_id, region, cluster_name):
5663
operation = job_client.submit_job_as_operation(
5764
request={"project_id": project_id, "region": region, "job": job}
5865
)
59-
response = operation.result()
66+
67+
try:
68+
response = operation.result()
69+
except Exception as e:
70+
print(f"Error submitting job or waiting for completion: {e}")
71+
raise
6072

6173
# Dataproc job output gets saved to the Cloud Storage bucket
6274
# allocated to the job. Use a regex to obtain the bucket and blob info.
6375
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)
64-
65-
output = (
66-
storage.Client()
67-
.get_bucket(matches.group(1))
68-
.blob(f"{matches.group(2)}.000000000")
69-
.download_as_bytes()
70-
.decode("utf-8")
71-
)
76+
if not matches:
77+
print(f"Error: Could not parse driver output URI: {response.driver_output_resource_uri}")
78+
raise ValueError
79+
80+
try:
81+
with storage.Client() as storage_client:
82+
bucket_name = matches.group(1)
83+
blob_name = f"{matches.group(2)}.000000000"
84+
output = (
85+
storage_client.get_bucket(bucket_name)
86+
.blob(blob_name)
87+
.download_as_bytes()
88+
.decode("utf-8")
89+
)
90+
except Exception as e:
91+
print(f"Error downloading job output: {e}")
92+
raise
7293

7394
print(f"Job finished successfully: {output}")
7495

@@ -77,9 +98,14 @@ def submit_job(project_id, region, cluster_name):
7798

7899

79100
if __name__ == "__main__":
80-
81-
my_project_id = "your_cluster" # <-- REPLACE THIS
82-
my_region = "us-central1" # <-- REPLACE THIS
83-
my_cluster_name = "your-node-group-cluster" # <-- REPLACE THIS
84-
85-
submit_job(my_project_id, my_region, my_cluster_name)
101+
import argparse
102+
103+
parser = argparse.ArgumentParser(
104+
description="Submits a Spark job to a Dataproc driver node group cluster."
105+
)
106+
parser.add_argument("--project_id", help="The Google Cloud project ID.", required=True)
107+
parser.add_argument("--region", help="The Dataproc region where the cluster is located.", required=True)
108+
parser.add_argument("--cluster_name", help="The name of the Dataproc cluster.", required=True)
109+
110+
args = parser.parse_args()
111+
submit_job(args.project_id, args.region, args.cluster_name)

0 commit comments

Comments
 (0)