Skip to content

Commit e98a709

Browse files
author
Thibaud Baas
authored
Merge pull request #153 from dataiku/task/fm-api
Add python client for FM API
2 parents a093bb0 + 2cf3bc5 commit e98a709

File tree

8 files changed

+1563
-1
lines changed

8 files changed

+1563
-1
lines changed

dataikuapi/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
from .dssclient import DSSClient
2+
from .fmclient import FMClientAWS, FMClientAzure
23

34
from .apinode_client import APINodeClient
45
from .apinode_admin_client import APINodeAdminClient
56

67
from .dss.recipe import GroupingRecipeCreator, JoinRecipeCreator, StackRecipeCreator, WindowRecipeCreator, SyncRecipeCreator, SamplingRecipeCreator, SQLQueryRecipeCreator, CodeRecipeCreator, SplitRecipeCreator, SortRecipeCreator, TopNRecipeCreator, DistinctRecipeCreator, DownloadRecipeCreator, PredictionScoringRecipeCreator, ClusteringScoringRecipeCreator
78

8-
from .dss.admin import DSSUserImpersonationRule, DSSGroupImpersonationRule
9+
from .dss.admin import DSSUserImpersonationRule, DSSGroupImpersonationRule

dataikuapi/fm/__init__.py

Whitespace-only changes.

dataikuapi/fm/future.py

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
import sys, time
2+
3+
4+
class FMFuture(object):
5+
"""
6+
A future on the DSS instance
7+
"""
8+
9+
def __init__(
10+
self, client, job_id, state=None, result_wrapper=lambda result: result
11+
):
12+
self.client = client
13+
self.job_id = job_id
14+
self.state = state
15+
self.state_is_peek = True
16+
self.result_wrapper = result_wrapper
17+
18+
@staticmethod
19+
def from_resp(client, resp, result_wrapper=lambda result: result):
20+
"""Creates a DSSFuture from a parsed JSON response"""
21+
return FMFuture(
22+
client, resp.get("jobId", None), state=resp, result_wrapper=result_wrapper
23+
)
24+
25+
@classmethod
26+
def get_result_wait_if_needed(cls, client, ret):
27+
if "jobId" in ret:
28+
future = FMFuture(client, ret["jobId"], ret)
29+
future.wait_for_result()
30+
return future.get_result()
31+
else:
32+
return ret["result"]
33+
34+
def abort(self):
35+
"""
36+
Abort the future
37+
"""
38+
return self.client._perform_tenant_empty("DELETE", "/futures/%s" % self.job_id)
39+
40+
def get_state(self):
41+
"""
42+
Get the status of the future, and its result if it's ready
43+
"""
44+
self.state = self.client._perform_tenant_json(
45+
"GET", "/futures/%s" % self.job_id, params={"peek": False}
46+
)
47+
self.state_is_peek = False
48+
return self.state
49+
50+
def peek_state(self):
51+
"""
52+
Get the status of the future, and its result if it's ready
53+
"""
54+
self.state = self.client._perform_tenant_json(
55+
"GET", "/futures/%s" % self.job_id, params={"peek": True}
56+
)
57+
self.state_is_peek = True
58+
return self.state
59+
60+
def get_result(self):
61+
"""
62+
Get the future result if it's ready, raises an Exception otherwise
63+
"""
64+
if (
65+
self.state is None
66+
or not self.state.get("hasResult", False)
67+
or self.state_is_peek
68+
):
69+
self.get_state()
70+
if self.state.get("hasResult", False):
71+
return self.result_wrapper(self.state.get("result", None))
72+
else:
73+
raise Exception("Result not ready")
74+
75+
def has_result(self):
76+
"""
77+
Checks whether the future has a result ready
78+
"""
79+
if self.state is None or not self.state.get("hasResult", False):
80+
self.get_state()
81+
return self.state.get("hasResult", False)
82+
83+
def wait_for_result(self):
84+
"""
85+
Wait and get the future result
86+
"""
87+
if self.state.get("hasResult", False):
88+
return self.result_wrapper(self.state.get("result", None))
89+
if (
90+
self.state is None
91+
or not self.state.get("hasResult", False)
92+
or self.state_is_peek
93+
):
94+
self.get_state()
95+
while not self.state.get("hasResult", False):
96+
time.sleep(5)
97+
self.get_state()
98+
if self.state.get("hasResult", False):
99+
return self.result_wrapper(self.state.get("result", None))
100+
else:
101+
raise Exception("No result")

0 commit comments

Comments
 (0)