Skip to content

Commit 5568bb9

Browse files
committed
API client for schema updates + start per-recipe structured recipe settings
1 parent ce13476 commit 5568bb9

File tree

2 files changed

+208
-22
lines changed

2 files changed

+208
-22
lines changed

dataikuapi/dss/dataset.py

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from ..utils import DataikuUTF8CSVReader
33
from ..utils import DataikuStreamedHttpUTF8CSVReader
44
from .future import DSSFuture
5-
import json
5+
import json, warnings
66
from .future import DSSFuture
77
from .metrics import ComputedMetrics
88
from .discussion import DSSObjectDiscussions
@@ -40,6 +40,25 @@ def delete(self, drop_data=False):
4040
########################################################
4141

4242
def get_settings(self):
43+
"""
44+
Returns the settings of this dataset as a :class:`DSSDatasetSettings`, or one of its subclasses.
45+
46+
Know subclasses of :class:`DSSDatasetSettings` include :class:`FSLikeDatasetSettings`
47+
and :class:`SQLDatasetSettings`
48+
49+
You must use :meth:`~DSSDatasetSettings.save()` on the returned object to make your changes effective
50+
on the dataset.
51+
52+
.. code-block:: python
53+
54+
# Example: activating discrete partitioning on a SQL dataset
55+
dataset = project.get_dataset("my_database_table")
56+
settings = dataset.get_settings()
57+
settings.add_discrete_partitioning_dimension("country")
58+
settings.save()
59+
60+
:rtype: :class:`DSSDatasetSettings`
61+
"""
4362
data = self.client._perform_json("GET", "/projects/%s/datasets/%s" % (self.project_key, self.dataset_name))
4463

4564
if data["type"] in self.__class__.FS_TYPES:
@@ -52,22 +71,23 @@ def get_settings(self):
5271

5372
def get_definition(self):
5473
"""
55-
Get the definition of the dataset
56-
57-
Returns:
58-
the definition, as a JSON object
74+
Deprecated. Use :meth:`get_settings`
75+
Get the raw settings of the dataset as a dict
76+
:rtype: dict
5977
"""
78+
warnings.warn("Dataset.get_definition is deprecated, please use get_settings", DeprecationWarning)
6079
return self.client._perform_json(
6180
"GET", "/projects/%s/datasets/%s" % (self.project_key, self.dataset_name))
6281

6382
def set_definition(self, definition):
6483
"""
84+
Deprecated. Use :meth:`get_settings` and :meth:`DSSDatasetSettings.save`
6585
Set the definition of the dataset
6686
67-
Args:
68-
definition: the definition, as a JSON object. You should only set a definition object
69-
that has been retrieved using the get_definition call.
87+
:param definition: the definition, as a dict. You should only set a definition object
88+
that has been retrieved using the get_definition call.
7089
"""
90+
warnings.warn("Dataset.set_definition is deprecated, please use get_settings", DeprecationWarning)
7191
return self.client._perform_json(
7292
"PUT", "/projects/%s/datasets/%s" % (self.project_key, self.dataset_name),
7393
body=definition)
@@ -459,6 +479,14 @@ def new_code_recipe(self, type, code=None, recipe_name=None):
459479
builder.with_script(code)
460480
return builder
461481

482+
def new_grouping_recipe(self, first_group_by, recipe_name=None):
483+
if recipe_name is None:
484+
recipe_name = "grouping_recipe_from_%s" % (self.dataset_name)
485+
builder = recipe.GroupingRecipeCreator(recipe_name, self.project)
486+
builder.with_input(self.dataset_name)
487+
builder.with_group_key(first_group_by)
488+
return builder
489+
462490
class DSSDatasetSettings(object):
463491
def __init__(self, dataset, settings):
464492
self.dataset = dataset

dataikuapi/dss/recipe.py

Lines changed: 172 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55

66
class DSSRecipe(object):
77
"""
8-
A handle to an existing recipe on the DSS instance
8+
A handle to an existing recipe on the DSS instance.
9+
Do not create this directly, use :meth:`dataikuapi.dss.project.DSSProject.get_recipe`
910
"""
1011
def __init__(self, client, project_key, recipe_name):
1112
self.client = client
@@ -16,6 +17,27 @@ def __init__(self, client, project_key, recipe_name):
1617
# Dataset deletion
1718
########################################################
1819

20+
def compute_schema_update(self):
21+
"""
22+
Computes which updates are required to the outputs of this recipe.
23+
The required updates are returned as a :class:`RequiredSchemaUpdates` object, which then
24+
allows you to :meth:`~RequiredSchemaUpdates.apply` the changes.
25+
26+
Usage example:
27+
28+
.. code-block:: python
29+
30+
required_updates = recipe.compute_schema_update()
31+
if required_updates.any_action_required():
32+
print("Some schemas will be updated")
33+
34+
# Note that you can call apply even if no changes are required. This will be noop
35+
required_updates.apply()
36+
"""
37+
data = self.client._perform_json(
38+
"GET", "/projects/%s/recipes/%s/schema-update" % (self.project_key, self.recipe_name))
39+
return RequiredSchemaUpdates(self, data)
40+
1941
def delete(self):
2042
"""
2143
Delete the recipe
@@ -27,6 +49,25 @@ def delete(self):
2749
# Recipe definition
2850
########################################################
2951

52+
def get_settings(self):
53+
"""
54+
Gets the settings of the recipe, as a :class:`DSSRecipeSettings` or one of its subclasses.
55+
56+
Some recipes have a dedicated class for the settings, with additional helpers to read and modify the settings
57+
58+
Once you are done modifying the returned settings object, you can call :meth:`~DSSRecipeSettings.save` on it
59+
in order to save the modifications to the DSS recipe
60+
"""
61+
data = self.client._perform_json(
62+
"GET", "/projects/%s/recipes/%s" % (self.project_key, self.recipe_name))
63+
64+
type = data["recipe"]["type"]
65+
66+
if type == "grouping":
67+
return GroupingRecipeSettings(self, data)
68+
else:
69+
return DSSRecipeSettings(self, data)
70+
3071
def get_definition_and_payload(self):
3172
"""
3273
Gets the definition of the recipe
@@ -36,7 +77,7 @@ def get_definition_and_payload(self):
3677
"""
3778
data = self.client._perform_json(
3879
"GET", "/projects/%s/recipes/%s" % (self.project_key, self.recipe_name))
39-
return DSSRecipeDefinitionAndPayload(data)
80+
return DSSRecipeDefinitionAndPayload(self, data)
4081

4182
def set_definition_and_payload(self, definition):
4283
"""
@@ -153,64 +194,93 @@ def get_status_messages(self):
153194
"""
154195
return self.data["allMessagesForFrontend"]["messages"]
155196

156-
class DSSRecipeDefinitionAndPayload(object):
197+
class DSSRecipeSettings(object):
157198
"""
158-
Definition for a recipe, that is, the recipe definition itself and its payload
199+
Settings of a recipe. Do not create this directly, use :meth:`DSSRecipe.get_settings`
159200
"""
160-
def __init__(self, data):
201+
def __init__(self, recipe, data):
202+
self.recipe = recipe
161203
self.data = data
204+
self.recipe_settings = self.data["recipe"]
205+
self.str_payload = self.data.get("payload", None)
206+
self.obj_payload = None
207+
208+
def save(self):
209+
"""
210+
Saves back the recipe in DSS.
211+
"""
212+
self._payload_to_str()
213+
return self.recipe.client._perform_json(
214+
"PUT", "/projects/%s/recipes/%s" % (self.recipe.project_key, self.recipe.recipe_name),
215+
body=self.data)
216+
217+
def _payload_to_str(self):
218+
if self.obj_payload is not None:
219+
self.str_payload = json.dumps(self.obj_payload)
220+
self.obj_payload = None
221+
if self.str_payload is not None:
222+
self.data["payload"] = self.str_payload
223+
224+
def _payload_to_obj(self):
225+
if self.str_payload is not None:
226+
self.obj_payload = json.loads(self.str_payload)
227+
self.str_payload = None
162228

163229
def get_recipe_raw_definition(self):
164230
"""
165-
Get the recipe definition as a raw JSON object
231+
Get the recipe definition as a raw dict
166232
"""
167-
return self.data.get('recipe', None)
233+
return self.recipe_settings
168234

169235
def get_recipe_inputs(self):
170236
"""
171237
Get the list of inputs of this recipe
172238
"""
173-
return self.data.get('recipe').get('inputs')
239+
return self.recipe_settings.get('inputs')
174240

175241
def get_recipe_outputs(self):
176242
"""
177243
Get the list of outputs of this recipe
178244
"""
179-
return self.data.get('recipe').get('outputs')
245+
return self.recipe_settings.get('outputs')
180246

181247
def get_recipe_params(self):
182248
"""
183249
Get the parameters of this recipe, as a raw JSON object
184250
"""
185-
return self.data.get('recipe').get('params')
251+
return self.recipe_settings.get('params')
186252

187253
def get_payload(self):
188254
"""
189255
Get the payload or script of this recipe, as a raw string
190256
"""
191-
return self.data.get('payload', None)
257+
self._payload_to_str()
258+
return self.str_payload
192259

193260
def get_json_payload(self):
194261
"""
195262
Get the payload or script of this recipe, as a JSON object
196263
"""
197-
return json.loads(self.data.get('payload', None))
264+
self._payload_to_obj()
265+
return self.obj_payload
198266

199267
def set_payload(self, payload):
200268
"""
201269
Set the raw payload of this recipe
202270
203271
:param str payload: the payload, as a string
204272
"""
205-
self.data['payload'] = payload
273+
self.str_payload = payload
274+
self.obj_payload = None
206275

207276
def set_json_payload(self, payload):
208277
"""
209278
Set the raw payload of this recipe
210279
211280
:param dict payload: the payload, as a dict. The payload will be converted to a JSON string internally
212281
"""
213-
self.data['payload'] = json.dumps(payload)
282+
self.str_payload = None
283+
self.obj_payload = payload
214284

215285
def has_input(self, input_ref):
216286
"""Returns whether this recipe has a given ref as input"""
@@ -246,6 +316,61 @@ def replace_output(self, current_output_ref, new_output_ref):
246316
if item.get("ref", None) == current_output_ref:
247317
item["ref"] = new_output_ref
248318

319+
# Old name
320+
class DSSRecipeDefinitionAndPayload(DSSRecipeSettings):
321+
pass
322+
323+
class GroupingRecipeSettings(DSSRecipeSettings):
324+
def clear_grouping_keys(self):
325+
self._payload_to_obj()
326+
self.obj_payload["keys"] = []
327+
328+
def add_grouping_key(self, column):
329+
self._payload_to_obj()
330+
self.obj_payload["keys"].append({"column":column})
331+
332+
def set_global_count_enabled(self, enabled):
333+
self._payload_to_obj()
334+
self.obj_payload["globalCount"] = enabled
335+
336+
def get_or_create_column_settings(self, column):
337+
"""
338+
Gets a dict representing the aggregations to perform on a column.
339+
Creates it and adds it to the potential aggregations if it does not already exists
340+
:param str column: The column name
341+
:rtype dict
342+
"""
343+
found = None
344+
for gv in self.obj_payload["values"]:
345+
if gv["column"] == column:
346+
found = gv
347+
break
348+
if found is None:
349+
found = {"column" : column}
350+
self.obj_payload["values"].append(found)
351+
return found
352+
353+
def set_column_aggregations(self, column, type, min=False, max=False, count=False, count_distinct=False,
354+
sum=False,concat=False,stddev=False,avg=False):
355+
"""
356+
Sets the basic aggregations on a column.
357+
Returns the dict representing the aggregations on the column
358+
359+
:param str column: The column name
360+
:param str type: The type of the column (as a DSS schema type name)
361+
:rtype dict
362+
"""
363+
cs = self.get_or_create_column_settings(column)
364+
cs["type"] = type
365+
cs["min"] = min
366+
cs["max"] = max
367+
cs["count"] = count
368+
cs["countDistinct"] = count_distinct
369+
cs["sum"] = sum
370+
cs["concat"] = concat
371+
cs["stddev"] = stddev
372+
return cs
373+
249374
class DSSRecipeCreator(object):
250375
"""
251376
Helper to create new recipes
@@ -652,3 +777,36 @@ class DownloadRecipeCreator(SingleOutputRecipeCreator):
652777
"""
653778
def __init__(self, name, project):
654779
SingleOutputRecipeCreator.__init__(self, 'download', name, project)
780+
781+
782+
class RequiredSchemaUpdates(object):
783+
"""
784+
Representation of the updates required to the schema of the outputs of a recipe.
785+
Do not create this class directly, use :meth:`DSSRecipe.compute_schema_update`
786+
"""
787+
788+
def __init__(self, recipe, data):
789+
self.recipe = recipe
790+
self.data = data
791+
self.drop_and_recreate = True
792+
self.synchronize_metastore = True
793+
794+
def any_action_required(self):
795+
return self.data["totalIncompatibilities"] > 0
796+
797+
def apply(self):
798+
results = []
799+
for computable in self.data["computables"]:
800+
osu = {
801+
"computableType": computable["type"],
802+
# dirty
803+
"computableId": computable["type"] == "DATASET" and computable["datasetName"] or computable["id"],
804+
"newSchema": computable["newSchema"],
805+
"dropAndRecreate": self.drop_and_recreate,
806+
"synchronizeMetastore" : self.synchronize_metastore
807+
}
808+
809+
results.append(self.recipe.client._perform_json("POST",
810+
"/projects/%s/recipes/%s/actions/updateOutputSchema" % (self.recipe.project_key, self.recipe.recipe_name),
811+
body=osu))
812+
return results

0 commit comments

Comments
 (0)