Skip to content

Commit c730bb5

Browse files
committed
wrap api for flow tools
1 parent d8b67d4 commit c730bb5

File tree

1 file changed

+80
-0
lines changed

1 file changed

+80
-0
lines changed

dataikuapi/dss/flow.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
from .utils import AnyLoc
22
from .recipe import DSSRecipeDefinitionAndPayload
3+
from .future import DSSFuture
34
import logging
5+
46
class DSSProjectFlow(object):
57
def __init__(self, client, project):
68
self.client = client
@@ -43,3 +45,81 @@ def replace_input_computable(self, current_ref, new_ref, type="DATASET"):
4345
dap = recipe_obj.get_definition_and_payload()
4446
dap.replace_input(current_ref, new_ref)
4547
recipe_obj.set_definition_and_payload(dap)
48+
49+
########################################################
50+
# Flow tools
51+
########################################################
52+
53+
def start_tool(self, type, data={}):
54+
"""
55+
Start a tool or open a view in the flow
56+
57+
:param type str: one of {COPY, CHECK_CONSISTENCY, PROPAGATE_SCHEMA} (tools) or {TAGS, CUSTOM_FIELDS, CONNECTIONS, COUNT_OF_RECORDS, FILESIZE, FILEFORMATS, RECIPES_ENGINES, RECIPES_CODE_ENVS, IMPALA_WRITE_MODE, HIVE_MODE, SPARK_ENGINE, SPARK_CONFIG, SPARK_PIPELINES, SQL_PIPELINES, PARTITIONING, PARTITIONS, SCENARIOS, CREATION, LAST_MODIFICATION, LAST_BUILD, RECENT_ACTIVITY, WATCH} (views)
58+
:param data dict: initial data for the tool (optional)
59+
60+
:returns: a :class:`.flow.DSSFlowTool` handle to interact with the newly-created tool or view
61+
"""
62+
tool_id = self.client._perform_text("POST", "/projects/%s/flow/tools/" % self.project.project_key, params={'type':type}, body=data)
63+
return DSSFlowTool(self.client, self.project.project_key, tool_id)
64+
65+
def propagate_schema(self, dataset_name, rebuild=False, recipe_update_options={}, excluded_recipes=[], mark_as_ok_recipes=[], partition_by_dim={}, partition_by_computable={}):
66+
"""
67+
Start an automatic schema propagate from a dataset
68+
69+
:param dataset_name str: name of a dataset to start propagating from
70+
:param rebuild bool: whether to automatically rebuild datasets if needed
71+
:param recipe_update_options str: pre-recipe or per-recipe-type update options to apply on recipes
72+
:param excluded_recipes list: list of recipes where propagation is to stop
73+
:param mark_as_ok_recipes list: list of recipes to consider as ok
74+
:param partition_by_dim dict: partition value to use for each dimension
75+
:param partition_by_computable dict: partition spec to use for a given dataset or recipe (overrides partition_by_dim)
76+
77+
:returns: dict of the messages collected during the update
78+
"""
79+
data = {'recipeUpdateOptions':recipe_update_options, 'partitionByDim':partition_by_dim, 'partitionByComputable':partition_by_computable, 'excludedRecipes':excluded_recipes, 'markAsOkRecipes':mark_as_ok_recipes}
80+
update_future = self.client._perform_json("POST", "/projects/%s/flow/tools/propagate-schema/%s/" % (self.project.project_key, dataset_name), params={'rebuild':rebuild}, body=data)
81+
return DSSFuture(self.client,update_future.get('jobId', None), update_future)
82+
83+
84+
class DSSFlowTool(object):
85+
"""
86+
Handle to interact with a flow tool
87+
"""
88+
def __init__(self, client, project_key, tool_id):
89+
self.client = client
90+
self.project_key = project_key
91+
self.tool_id = tool_id
92+
93+
def stop(self):
94+
"""
95+
Stops the tool and releases the resources held by it
96+
"""
97+
return self.client._perform_json("POST", "/projects/%s/flow/tools/%s/stop" % (self.project_key, self.tool_id))
98+
99+
def get_state(self, options={}):
100+
"""
101+
Get the current state of the tool or view
102+
103+
:returns: the state, as a dict
104+
"""
105+
return self.client._perform_json("GET", "/projects/%s/flow/tools/%s/state" % (self.project_key, self.tool_id), body=options)
106+
107+
def do(self, action):
108+
"""
109+
Perform a manual user action on the tool
110+
111+
:returns: the current state, as a dict
112+
"""
113+
return self.client._perform_json("PUT", "/projects/%s/flow/tools/%s/action" % (self.project_key, self.tool_id), body=action)
114+
115+
def update(self, options={}):
116+
"""
117+
(for tools only) Start updating the tool state
118+
119+
:params options dict: options for the update (optional)
120+
121+
:returns: a :class:`.future.DSSFuture` handle to interact with task of performing the update
122+
"""
123+
update_future = self.client._perform_json("POST", "/projects/%s/flow/tools/%s/update" % (self.project_key, self.tool_id), body=options)
124+
return DSSFuture(self.client,update_future.get('jobId', None), update_future)
125+

0 commit comments

Comments
 (0)