Skip to content

Commit 870524c

Browse files
committed
Add a direct .run() API on recipes
1 parent 2c154fa commit 870524c

File tree

2 files changed

+51
-10
lines changed

2 files changed

+51
-10
lines changed

dataikuapi/dss/dataset.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -224,14 +224,12 @@ def build(self, job_type="NON_RECURSIVE_FORCED_BUILD", partitions=None, wait=Tru
224224
:return: the :class:`dataikuapi.dss.job.DSSJob` job handle corresponding to the built job
225225
:rtype: :class:`dataikuapi.dss.job.DSSJob`
226226
"""
227-
jd = self.project.new_job_definition_builder(job_type)
228-
227+
jd = self.project.new_job(job_type)
229228
jd.with_output(self.dataset_name, partition=partitions)
230-
231229
if wait:
232-
return self.project.start_job_and_wait(jd.get_definition())
230+
return jd.start_and_wait()
233231
else:
234-
return self.project.start_job(jd.get_definition())
232+
return jd.start()
235233

236234

237235
def synchronize_hive_metastore(self):

dataikuapi/dss/recipe.py

Lines changed: 48 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,6 @@ def __init__(self, client, project_key, recipe_name):
1313
self.project_key = project_key
1414
self.recipe_name = recipe_name
1515

16-
########################################################
17-
# Dataset deletion
18-
########################################################
19-
2016
def compute_schema_updates(self):
2117
"""
2218
Computes which updates are required to the outputs of this recipe.
@@ -38,6 +34,37 @@ def compute_schema_updates(self):
3834
"GET", "/projects/%s/recipes/%s/schema-update" % (self.project_key, self.recipe_name))
3935
return RequiredSchemaUpdates(self, data)
4036

37+
def run(self, job_type="NON_RECURSIVE_FORCED_BUILD", partitions=None, wait=True, no_fail=False):
38+
"""
39+
Starts a new job to run this recipe and wait for it to complete.
40+
Raises if the job failed.
41+
42+
.. code-block:: python
43+
44+
job = recipe.run()
45+
print("Job %s done" % job.id)
46+
47+
:param job_type: The job type. One of RECURSIVE_BUILD, NON_RECURSIVE_FORCED_BUILD or RECURSIVE_FORCED_BUILD
48+
:param partitions: If the outputs are partitioned, a list of partition ids to build
49+
:param no_fail: if True, does not raise if the job failed.
50+
:return: the :class:`dataikuapi.dss.job.DSSJob` job handle corresponding to the built job
51+
:rtype: :class:`dataikuapi.dss.job.DSSJob`
52+
"""
53+
54+
settings = self.get_settings()
55+
output_refs = settings.get_flat_output_refs()
56+
57+
if len(output_refs) == 0:
58+
raise Exception("recipe has no outputs, can't run it")
59+
60+
jd = self.client.get_project(self.project_key).new_job(job_type)
61+
jd.with_output(output_refs[0], partition=partitions)
62+
63+
if wait:
64+
return jd.start_and_wait()
65+
else:
66+
return jd.start()
67+
4168
def delete(self):
4269
"""
4370
Delete the recipe
@@ -60,7 +87,7 @@ def get_settings(self):
6087
"""
6188
data = self.client._perform_json(
6289
"GET", "/projects/%s/recipes/%s" % (self.project_key, self.recipe_name))
63-
90+
print(data)
6491
type = data["recipe"]["type"]
6592

6693
if type == "grouping":
@@ -316,6 +343,22 @@ def replace_output(self, current_output_ref, new_output_ref):
316343
if item.get("ref", None) == current_output_ref:
317344
item["ref"] = new_output_ref
318345

346+
def get_flat_input_refs(self):
347+
ret = []
348+
for role_key, role_obj in self.get_recipe_inputs().items():
349+
for item in role_obj["items"]:
350+
ret.append(item["ref"])
351+
return ret
352+
353+
def get_flat_output_refs(self):
354+
ret = []
355+
for role_key, role_obj in self.get_recipe_outputs().items():
356+
for item in role_obj["items"]:
357+
ret.append(item["ref"])
358+
return ret
359+
360+
361+
319362
# Old name
320363
class DSSRecipeDefinitionAndPayload(DSSRecipeSettings):
321364
"""

0 commit comments

Comments
 (0)