Skip to content

Commit f8b7309

Browse files
committed
factorize and add helpers to wait on scenarios/jobs
1 parent 1b9d0af commit f8b7309

File tree

3 files changed

+64
-31
lines changed

3 files changed

+64
-31
lines changed

dataikuapi/dss/job.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
import time
2+
import sys
3+
from dataikuapi.utils import DataikuException
14

25
class DSSJob(object):
36
"""
@@ -40,3 +43,24 @@ def get_log(self, activity=None):
4043
params={
4144
"activity" : activity
4245
})
46+
47+
class DSSJobWaiter(object):
48+
"""
49+
Helper to wait for a job's completion
50+
"""
51+
def __init__(self, job):
52+
self.job = job
53+
54+
def wait(self, no_fail=False):
55+
job_state = self.job.get_status().get("baseStatus", {}).get("state", "")
56+
sleep_time = 2
57+
while job_state not in ["DONE", "ABORTED", "FAILED"]:
58+
sleep_time = 300 if sleep_time >= 300 else sleep_time * 2
59+
time.sleep(sleep_time)
60+
job_state = self.job.get_status().get("baseStatus", {}).get("state", "")
61+
if job_state in ["ABORTED", "FAILED"]:
62+
if no_fail:
63+
break
64+
else:
65+
raise DataikuException("Job run did not finish. Status: %s" % (job_state))
66+
return job_state

dataikuapi/dss/project.py

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from .recipe import DSSRecipe
44
from .managedfolder import DSSManagedFolder
55
from .savedmodel import DSSSavedModel
6-
from .job import DSSJob
6+
from .job import DSSJob, DSSJobWaiter
77
from .scenario import DSSScenario
88
from .apiservice import DSSAPIService
99
import sys
@@ -291,18 +291,8 @@ def start_job_and_wait(self, definition, no_fail=False):
291291
"""
292292
job_def = self.client._perform_json("POST", "/projects/%s/jobs/" % self.project_key, body = definition)
293293
job = DSSJob(self.client, self.project_key, job_def['id'])
294-
job_state = job.get_status().get("baseStatus", {}).get("state", "")
295-
sleep_time = 2
296-
while job_state not in ["DONE", "ABORTED", "FAILED"]:
297-
sleep_time = 300 if sleep_time >= 300 else sleep_time * 2
298-
time.sleep(sleep_time)
299-
job_state = job.get_status().get("baseStatus", {}).get("state", "")
300-
if job_state in ["ABORTED", "FAILED"]:
301-
if no_fail:
302-
break
303-
else:
304-
raise DataikuException("Job run did not finish. Status: %s" % (job_state))
305-
return job_state
294+
waiter = DSSJobWaiter(job)
295+
return waiter.wait(no_fail)
306296

307297
def new_job_definition_builder(self, job_type='NON_RECURSIVE_FORCED_BUILD'):
308298
return JobDefinitionBuilder(self.project_key, job_type)

dataikuapi/dss/scenario.py

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -57,24 +57,9 @@ def run_and_wait(self, params={}, no_fail=False):
5757
A :class:`dataikuapi.dss.admin.DSSScenarioRun` run handle
5858
"""
5959
trigger_fire = self.run(params)
60-
scenario_run = None
61-
refresh_trigger_counter = 0
62-
while scenario_run is None:
63-
refresh_trigger_counter += 1
64-
if refresh_trigger_counter == 10:
65-
refresh_trigger_counter = 0
66-
if trigger_fire.is_cancelled(refresh=refresh_trigger_counter == 0):
67-
if no_fail:
68-
return None
69-
else:
70-
raise DataikuException("Scenario run has been cancelled")
71-
scenario_run = trigger_fire.get_scenario_run()
72-
time.sleep(5)
73-
print("Scenario (ID: '%s') run started " % scenario_run.run.get('runId', 'not found'))
74-
while not scenario_run.run.get('result', False):
75-
scenario_run = trigger_fire.get_scenario_run()
76-
time.sleep(5)
77-
return scenario_run
60+
scenario_run = trigger_fire.wait_for_scenario_run(no_fail)
61+
waiter = DSSScenarioRunWaiter(scenario_run, trigger_fire)
62+
return waiter.wait(no_fail)
7863

7964
def get_last_runs(self, limit=10, only_finished_runs=False):
8065
"""
@@ -215,6 +200,24 @@ def get_duration(self):
215200
duration = (end_time - self.get_start_time()).total_seconds()
216201
return duration
217202

203+
class DSSScenarioRunWaiter(object):
204+
"""
205+
Helper to wait for a job's completion
206+
"""
207+
def __init__(self, scenario_run, trigger_fire):
208+
self.trigger_fire = trigger_fire
209+
self.scenario_run = scenario_run
210+
211+
def wait(self, no_fail=False):
212+
while not self.scenario_run.run.get('result', False):
213+
self.scenario_run = self.trigger_fire.get_scenario_run()
214+
time.sleep(5)
215+
outcome = self.scenario_run.run.get('result', None).get('outcome', 'UNKNOWN')
216+
if outcome == 'SUCCESS' or no_fail:
217+
return self.scenario_run
218+
else:
219+
raise DataikuException("Scenario run returned status %s" % outcome)
220+
218221
class DSSTriggerFire(object):
219222
"""
220223
The activation of a trigger on the DSS instance
@@ -254,3 +257,19 @@ def is_cancelled(self, refresh=False):
254257
'triggerRunId' : self.run_id
255258
})
256259
return self.trigger_fire["cancelled"]
260+
261+
def wait_for_scenario_run(self, no_fail=False):
262+
scenario_run = None
263+
refresh_trigger_counter = 0
264+
while scenario_run is None:
265+
refresh_trigger_counter += 1
266+
if refresh_trigger_counter == 10:
267+
refresh_trigger_counter = 0
268+
if self.is_cancelled(refresh=refresh_trigger_counter == 0):
269+
if no_fail:
270+
return None
271+
else:
272+
raise DataikuException("Scenario run has been cancelled")
273+
scenario_run = self.get_scenario_run()
274+
time.sleep(5)
275+
return scenario_run

0 commit comments

Comments
 (0)