Skip to content

Commit 717755a

Browse files
committed
Enhanced API client for schema auto propagation
1 parent e969b76 commit 717755a

File tree

1 file changed

+113
-13
lines changed

1 file changed

+113
-13
lines changed

dataikuapi/dss/flow.py

Lines changed: 113 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -125,23 +125,15 @@ def start_tool(self, type, data={}):
125125
tool_id = self.client._perform_text("POST", "/projects/%s/flow/tools/" % self.project.project_key, params={'type':type}, body=data)
126126
return DSSFlowTool(self.client, self.project.project_key, tool_id)
127127

128-
def propagate_schema(self, dataset_name, rebuild=False, recipe_update_options={}, excluded_recipes=[], mark_as_ok_recipes=[], partition_by_dim={}, partition_by_computable={}):
128+
def new_schema_propagation(self, dataset_name):
129129
"""
130-
Start an automatic schema propagate from a dataset
130+
Start an automatic schema propagation from a dataset
131131
132132
:param dataset_name str: name of a dataset to start propagating from
133-
:param rebuild bool: whether to automatically rebuild datasets if needed
134-
:param recipe_update_options str: pre-recipe or per-recipe-type update options to apply on recipes
135-
:param excluded_recipes list: list of recipes where propagation is to stop
136-
:param mark_as_ok_recipes list: list of recipes to consider as ok
137-
:param partition_by_dim dict: partition value to use for each dimension
138-
:param partition_by_computable dict: partition spec to use for a given dataset or recipe (overrides partition_by_dim)
139-
140-
:returns: dict of the messages collected during the update
133+
134+
:returns a :class:`DSSSchemaPropagationRunBuilder` to set options and start the propagation
141135
"""
142-
data = {'recipeUpdateOptions':recipe_update_options, 'partitionByDim':partition_by_dim, 'partitionByComputable':partition_by_computable, 'excludedRecipes':excluded_recipes, 'markAsOkRecipes':mark_as_ok_recipes}
143-
update_future = self.client._perform_json("POST", "/projects/%s/flow/tools/propagate-schema/%s/" % (self.project.project_key, dataset_name), params={'rebuild':rebuild}, body=data)
144-
return DSSFuture(self.client,update_future.get('jobId', None), update_future)
136+
return DSSSchemaPropagationRunBuilder(self.project, self.client, dataset_name)
145137

146138

147139
def _to_smart_ref(self, obj):
@@ -168,6 +160,114 @@ def _to_smart_ref(self, obj):
168160
"objectType": ot
169161
}
170162

163+
class DSSSchemaPropagationRunBuilder(object):
164+
"""Do not create this directly, use :meth:`DSSProjectFlow.new_schema_propagation`"""
165+
def __init__(self, project, client, dataset_name):
166+
self.project = project
167+
self.client = client
168+
self.dataset_name = dataset_name
169+
self.settings = {
170+
'recipeUpdateOptions': {
171+
"byType": {},
172+
"byName": {}
173+
},
174+
'defaultPartitionValuesByDimension': {},
175+
'partitionsByComputable':{},
176+
'excludedRecipes': [],
177+
'markAsOkRecipes': [],
178+
'autoRebuild' : True
179+
}
180+
181+
def set_auto_rebuild(self, auto_rebuild):
182+
"""
183+
Sets whether to automatically rebuild datasets if needed while propagating (default true)
184+
"""
185+
self.settings["autoRebuild"] = auto_rebuild
186+
187+
def set_default_partitioning_value(self, dimension, value):
188+
"""
189+
In the case of partitioned flows, sets the default partition value to use when rebuilding, for a specific dimension name
190+
191+
:param str dimension: a partitioning dimension name
192+
:param str value: a partitioning dimension value
193+
"""
194+
self.settings["defaultPartitionValuesByDimension"][dimension] = value
195+
196+
def set_partition_for_computable(self, full_id, partition):
197+
"""
198+
In the case of partitioned flows, sets the partition id to use when building a particular computable. Overrides
199+
the default partitioning value per dimension
200+
201+
:param str full_id: Full name of the computable, in the form PROJECTKEY.id
202+
:param str partition: a full partition id (all dimensions)
203+
"""
204+
self.settings["partitionsByComputable"][full_id] = partition
205+
206+
def stop_at(self, recipe_name):
207+
"""Marks a recipe as a recipe where propagation stops"""
208+
self.settings["excludedRecipes"].append(recipe_name)
209+
210+
def mark_recipe_as_ok(self, name):
211+
"""Marks a recipe as always considered as OK during propagation"""
212+
self.settings["markAsOkRecipes"].append(name)
213+
214+
def set_grouping_update_options(self, recipe=None, remove_missing_aggregates=True, remove_missing_keys=True,
215+
new_aggregates={}):
216+
"""
217+
Sets update options for grouping recipes
218+
:param str recipe: if None, applies to all grouping recipes. Else, applies only to this name
219+
"""
220+
data = {
221+
"removeMissingAggregates": remove_missing_aggregates,
222+
"removeMissingKeys" : remove_missing_keys,
223+
"newAggregates": new_aggregates
224+
}
225+
if recipe is None:
226+
self.settings["recipeUpdateOptions"]["byType"]["grouping"] = data
227+
else:
228+
self.settings["recipeUpdateOptions"]["byName"][recipe] = data
229+
230+
def set_window_update_options(self, recipe=None, remove_missing_aggregates=True, remove_missing_in_window=True,
231+
new_aggregates={}):
232+
"""
233+
Sets update options for window recipes
234+
:param str recipe: if None, applies to all window recipes. Else, applies only to this name
235+
"""
236+
data = {
237+
"removeMissingAggregates": remove_missing_aggregates,
238+
"removeMissingInWindow" : remove_missing_in_window,
239+
"newAggregates": new_aggregates
240+
}
241+
if recipe is None:
242+
self.settings["recipeUpdateOptions"]["byType"]["window"] = data
243+
else:
244+
self.settings["recipeUpdateOptions"]["byName"][recipe] = data
245+
246+
def set_join_update_options(self, recipe=None, remove_missing_join_conditions=True, remove_missing_join_values=True,
247+
new_selected_columns={}):
248+
"""
249+
Sets update options for join recipes
250+
:param str recipe: if None, applies to all join recipes. Else, applies only to this name
251+
"""
252+
data = {
253+
"removeMissingJoinConditions": remove_missing_join_conditions,
254+
"removeMissingJoinValues" : remove_missing_join_values,
255+
"newSelectedColumns": new_selected_columns
256+
}
257+
if recipe is None:
258+
self.settings["recipeUpdateOptions"]["byType"]["join"] = data
259+
else:
260+
self.settings["recipeUpdateOptions"]["byName"][recipe] = data
261+
262+
def start(self):
263+
"""
264+
Starts the actual propagation. Returns a future to wait for completion
265+
266+
:rtype: :class:`dataikuapi.dss.future.DSSFuture`
267+
"""
268+
future_resp = self.client._perform_json("POST", "/projects/%s/flow/tools/propagate-schema/%s/" % (self.project.project_key, self.dataset_name), body=self.settings)
269+
return DSSFuture.from_resp(self.client, future_resp)
270+
171271
class DSSFlowZone(object):
172272
"""
173273
A zone in the Flow. Do not create this object manually, use :meth:`DSSProjectFlow.get_zone`

0 commit comments

Comments
 (0)