Skip to content

Commit 148d500

Browse files
authored
Merge pull request #18 from dataiku/feature/scenarios-and-jobs-run-and-wait
Scenarios and jobs run and wait
2 parents 6882c02 + 8515b8f commit 148d500

File tree

2 files changed

+135
-2
lines changed

2 files changed

+135
-2
lines changed

dataikuapi/dss/project.py

Lines changed: 73 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import time
12
from dataset import DSSDataset
23
from recipe import DSSRecipe
34
from managedfolder import DSSManagedFolder
@@ -9,6 +10,8 @@
910
import os.path as osp
1011
from .future import DSSFuture
1112
from .notebook import DSSNotebook
13+
from dataikuapi.utils import DataikuException
14+
1215

1316
class DSSProject(object):
1417
"""
@@ -276,7 +279,33 @@ def start_job(self, definition):
276279
job_def = self.client._perform_json("POST", "/projects/%s/jobs/" % self.project_key, body = definition)
277280
return DSSJob(self.client, self.project_key, job_def['id'])
278281

279-
282+
def start_job_and_wait(self, definition, no_fail=False):
283+
"""
284+
Create a new job. Wait the end of the job to complete.
285+
286+
Args:
287+
definition: the definition for the job to create. The definition must contain the type of job (RECURSIVE_BUILD,
288+
NON_RECURSIVE_FORCED_BUILD, RECURSIVE_FORCED_BUILD, RECURSIVE_MISSING_ONLY_BUILD) and a list of outputs to build.
289+
Optionally, a refreshHiveMetastore field can specify whether to re-synchronize the Hive metastore for recomputed
290+
HDFS datasets.
291+
"""
292+
job_def = self.client._perform_json("POST", "/projects/%s/jobs/" % self.project_key, body = definition)
293+
job = DSSJob(self.client, self.project_key, job_def['id'])
294+
job_state = job.get_status().get("baseStatus", {}).get("state", "")
295+
sleep_time = 2
296+
while job_state not in ["DONE", "ABORTED", "FAILED"]:
297+
sleep_time = 300 if sleep_time >= 300 else sleep_time * 2
298+
time.sleep(sleep_time)
299+
job_state = job.get_status().get("baseStatus", {}).get("state", "")
300+
if job_state in ["ABORTED", "FAILED"]:
301+
if no_fail:
302+
break
303+
else:
304+
raise DataikuException("Job run did not finish. Status: %s" % (job_state))
305+
return job_state
306+
307+
def new_job_definition_builder(self, job_type='NON_RECURSIVE_FORCED_BUILD'):
308+
return JobDefinitionBuilder(self.project_key, job_type)
280309

281310
########################################################
282311
# Variables
@@ -380,7 +409,7 @@ def list_imported_bundles(self):
380409
"/projects/%s/bundles/imported" % self.project_key)
381410

382411
def import_bundle_from_archive(self, archive_path):
383-
return self.client._perform_empty("POST",
412+
return self.client._perform_json("POST",
384413
"/projects/%s/bundles/imported/actions/importFromArchive" % (self.project_key),
385414
params = { "archivePath" : osp.abspath(archive_path) })
386415

@@ -545,3 +574,45 @@ def set_tags(self, tags={}):
545574
@param obj: must be a modified version of the object returned by list_tags
546575
"""
547576
return self.client._perform_empty("PUT", "/projects/%s/tags" % self.project_key, body = tags)
577+
578+
579+
class JobDefinitionBuilder(object):
580+
def __init__(self, project_key, job_type="NON_RECURSIVE_FORCED_BUILD"):
581+
"""
582+
Create a helper to build a job definition
583+
584+
:param project_key: the project in which the job is launched
585+
:param job_type: the build type for the job RECURSIVE_BUILD, NON_RECURSIVE_FORCED_BUILD,
586+
RECURSIVE_FORCED_BUILD, RECURSIVE_MISSING_ONLY_BUILD
587+
588+
"""
589+
self.project_key = project_key
590+
self.definition = {'type':job_type, 'refreshHiveMetastore':False, 'outputs':[]}
591+
592+
def with_type(self, job_type):
593+
"""
594+
Sets the build type
595+
596+
:param job_type: the build type for the job RECURSIVE_BUILD, NON_RECURSIVE_FORCED_BUILD,
597+
RECURSIVE_FORCED_BUILD, RECURSIVE_MISSING_ONLY_BUILD
598+
"""
599+
self.definition['type'] = job_type
600+
return self
601+
602+
def with_refresh_metastore(self, refresh_metastore):
603+
"""
604+
Sets whether the hive tables built by the job should have their definitions
605+
refreshed after the corresponding dataset is built
606+
"""
607+
self.definition['refreshHiveMetastore'] = refresh_metastore
608+
return self
609+
610+
def with_output(self, name, object_type=None, object_project_key=None, partition=None):
611+
"""
612+
Adds an item to build in the definition
613+
"""
614+
self.definition['outputs'].append({'type':object_type, 'id':name, 'projectKey':object_project_key, 'partition':partition})
615+
return self
616+
617+
def get_definition(self):
618+
return self.definition

dataikuapi/dss/scenario.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
from datetime import datetime
2+
import time
3+
from dataikuapi.utils import DataikuException
4+
25

36
class DSSScenario(object):
47
"""
@@ -26,6 +29,52 @@ def run(self, params={}):
2629
"POST", "/projects/%s/scenarios/%s/run" % (self.project_key, self.id), body=params)
2730
return DSSTriggerFire(self, trigger_fire)
2831

32+
def get_trigger_fire(self, trigger_id, trigger_run_id):
33+
"""
34+
Requests a trigger of the run of a scenario
35+
36+
Args:
37+
trigger_id: Id of trigger
38+
trigger_run_id: Id of the run of the trigger
39+
40+
Returns:
41+
A :class:`dataikuapi.dss.admin.DSSTriggerFire` trigger handle
42+
"""
43+
trigger_fire = self.client._perform_json(
44+
"GET", "/projects/%s/scenarios/trigger/%s/%s" % (self.project_key, self.id, trigger_id), params={
45+
'triggerRunId' : trigger_run_id
46+
})
47+
return DSSTriggerFire(self, trigger_fire)
48+
49+
def run_and_wait(self, params={}, no_fail=False):
50+
"""
51+
Requests a run of the scenario, which will start after a few seconds. Wait the end of the run to complete.
52+
53+
Args:
54+
params: additional parameters that will be passed to the scenario through trigger params
55+
56+
Returns:
57+
A :class:`dataikuapi.dss.admin.DSSScenarioRun` run handle
58+
"""
59+
trigger_fire = self.run(params)
60+
scenario_run = None
61+
refresh_trigger_counter = 0
62+
while scenario_run is None:
63+
refresh_trigger_counter += 1
64+
if refresh_trigger_counter == 10:
65+
refresh_trigger_counter = 0
66+
if trigger_fire.is_cancelled(refresh=refresh_trigger_counter == 0):
67+
if no_fail:
68+
return None
69+
else:
70+
raise DataikuException("Scenario run has been cancelled")
71+
scenario_run = trigger_fire.get_scenario_run()
72+
time.sleep(5)
73+
while not scenario_run.run.get('result', False):
74+
scenario_run = trigger_fire.get_scenario_run()
75+
time.sleep(5)
76+
return scenario_run
77+
2978
def get_last_runs(self, limit=10, only_finished_runs=False):
3079
"""
3180
Get the list of the last runs of the scenario.
@@ -191,3 +240,16 @@ def get_scenario_run(self):
191240
return None
192241
else:
193242
return DSSScenarioRun(self.client, run['scenarioRun'])
243+
244+
def is_cancelled(self, refresh=False):
245+
"""
246+
Whether the trigger has been cancelled
247+
248+
:param refresh: get the state of the trigger from the backend
249+
"""
250+
if refresh == True:
251+
self.trigger_fire = self.client._perform_json(
252+
"GET", "/projects/%s/scenarios/trigger/%s/%s" % (self.project_key, self.scenario_id, self.trigger_id), params={
253+
'triggerRunId' : self.run_id
254+
})
255+
return self.trigger_fire["cancelled"]

0 commit comments

Comments
 (0)