Skip to content

Commit 1d859a0

Browse files
committed
Upgrade Flow code generation
1 parent cd21fda commit 1d859a0

File tree

2 files changed

+239
-45
lines changed

2 files changed

+239
-45
lines changed

dataikuapi/dss/recipe.py

Lines changed: 99 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -245,8 +245,8 @@ def __init__(self, recipe, data):
245245
self.recipe = recipe
246246
self.data = data
247247
self.recipe_settings = self.data["recipe"]
248-
self.str_payload = self.data.get("payload", None)
249-
self.obj_payload = None
248+
self._str_payload = self.data.get("payload", None)
249+
self._obj_payload = None
250250

251251
def save(self):
252252
"""
@@ -257,17 +257,37 @@ def save(self):
257257
"PUT", "/projects/%s/recipes/%s" % (self.recipe.project_key, self.recipe.recipe_name),
258258
body=self.data)
259259

260+
@property
261+
def type(self):
262+
return self.recipe_settings["type"]
263+
264+
@property
265+
def str_payload(self):
266+
"""The raw "payload" of the recipe, as a string"""
267+
self._payload_to_str()
268+
return self._str_payload
269+
@str_payload.setter
270+
def str_payload(self, payload):
271+
self._str_payload = payload
272+
self._obj_payload = None
273+
274+
@property
275+
def obj_payload(self):
276+
"""The raw "payload" of the recipe, as a dict"""
277+
self._payload_to_obj()
278+
return self._obj_payload
279+
260280
def _payload_to_str(self):
261-
if self.obj_payload is not None:
262-
self.str_payload = json.dumps(self.obj_payload)
263-
self.obj_payload = None
264-
if self.str_payload is not None:
265-
self.data["payload"] = self.str_payload
281+
if self._obj_payload is not None:
282+
self._str_payload = json.dumps(self._obj_payload)
283+
self._obj_payload = None
284+
if self._str_payload is not None:
285+
self.data["payload"] = self._str_payload
266286

267287
def _payload_to_obj(self):
268-
if self.str_payload is not None:
269-
self.obj_payload = json.loads(self.str_payload)
270-
self.str_payload = None
288+
if self._str_payload is not None:
289+
self._obj_payload = json.loads(self._str_payload)
290+
self._str_payload = None
271291

272292
def get_recipe_raw_definition(self):
273293
"""
@@ -303,31 +323,31 @@ def get_payload(self):
303323
:rtype string
304324
"""
305325
self._payload_to_str()
306-
return self.str_payload
326+
return self._str_payload
307327

308328
def get_json_payload(self):
309329
"""
310330
Get the payload or script of this recipe, parsed from JSON, as a dict
311331
:rtype dict
312332
"""
313333
self._payload_to_obj()
314-
return self.obj_payload
334+
return self._obj_payload
315335

316336
def set_payload(self, payload):
317337
"""
318338
Set the payload of this recipe
319339
:param str payload: the payload, as a string
320340
"""
321-
self.str_payload = payload
322-
self.obj_payload = None
341+
self._str_payload = payload
342+
self._obj_payload = None
323343

324344
def set_json_payload(self, payload):
325345
"""
326346
Set the payload of this recipe
327347
:param dict payload: the payload, as a dict. The payload will be converted to a JSON string internally
328348
"""
329-
self.str_payload = None
330-
self.obj_payload = payload
349+
self._str_payload = None
350+
self._obj_payload = payload
331351

332352
def has_input(self, input_ref):
333353
"""Returns whether this recipe has a given ref as input"""
@@ -363,6 +383,42 @@ def replace_output(self, current_output_ref, new_output_ref):
363383
if item.get("ref", None) == current_output_ref:
364384
item["ref"] = new_output_ref
365385

386+
def add_input(self, role, ref, partition_deps=None):
387+
if partition_deps is None:
388+
partition_deps = []
389+
self._get_or_create_input_role(role)["items"].append({"ref": ref, "deps": partition_deps})
390+
391+
def add_output(self, role, ref, append_mode=False):
392+
self._get_or_create_output_role(role)["items"].append({"ref": ref, "appendMode": append_mode})
393+
394+
def _get_or_create_input_role(self, role):
395+
inputs = self.get_recipe_inputs()
396+
if not role in inputs:
397+
role_obj = {"items": []}
398+
inputs[role] = role_obj
399+
return inputs[role]
400+
401+
def _get_or_create_output_role(self, role):
402+
outputs = self.get_recipe_outputs()
403+
if not role in outputs:
404+
role_obj = {"items": []}
405+
outputs[role] = role_obj
406+
return outputs[role]
407+
408+
def _get_flat_inputs(self):
409+
ret = []
410+
for role_key, role_obj in self.get_recipe_inputs().items():
411+
for item in role_obj["items"]:
412+
ret.append((role_key, item))
413+
return ret
414+
415+
def _get_flat_outputs(self):
416+
ret = []
417+
for role_key, role_obj in self.get_recipe_outputs().items():
418+
for item in role_obj["items"]:
419+
ret.append((role_key, item))
420+
return ret
421+
366422
def get_flat_input_refs(self):
367423
"""
368424
Returns a list of all input refs of this recipe, regardless of the input role
@@ -625,19 +681,16 @@ class GroupingRecipeSettings(DSSRecipeSettings):
625681
"""
626682
def clear_grouping_keys(self):
627683
"""Removes all grouping keys from this grouping recipe"""
628-
self._payload_to_obj()
629684
self.obj_payload["keys"] = []
630685

631686
def add_grouping_key(self, column):
632687
"""
633688
Adds grouping on a column
634689
:param str column: Column to group on
635690
"""
636-
self._payload_to_obj()
637691
self.obj_payload["keys"].append({"column":column})
638692

639693
def set_global_count_enabled(self, enabled):
640-
self._payload_to_obj()
641694
self.obj_payload["globalCount"] = enabled
642695

643696
def get_or_create_column_settings(self, column):
@@ -778,16 +831,35 @@ class PrepareRecipeSettings(DSSRecipeSettings):
778831
"""
779832
pass
780833

781-
def get_raw_steps(self):
834+
@property
835+
def raw_steps(self):
782836
"""
783837
Returns a raw list of the steps of this prepare recipe.
784838
You can modify the returned list.
785839
786840
Each step is a dict of settings. The precise settings for each step are not documented
787841
"""
788-
self._payload_to_obj()
789842
return self.obj_payload["steps"]
790843

844+
def add_processor_step(self, type, params):
845+
step = {
846+
"metaType": "PROCESSOR",
847+
"type": type,
848+
"params": params
849+
}
850+
self.raw_steps.append(step)
851+
852+
def add_filter_on_bad_meaning(self, meaning, columns):
853+
params = {
854+
"action" : "REMOVE_ROW",
855+
"type" : meaning
856+
}
857+
if isinstance(columns, basestring):
858+
params["appliesTo"] = "SINGLE_COLUMN"
859+
params["columns"] = [columns]
860+
elif isinstance(columns, list):
861+
params["appliesTo"] = "COLUMNS"
862+
params["columns"] = columns
791863

792864
class PrepareRecipeCreator(SingleOutputRecipeCreator):
793865
"""
@@ -825,15 +897,15 @@ def raw_virtual_inputs(self):
825897
Returns the raw list of virtual inputs
826898
:rtype list of dict
827899
"""
828-
return self.get_json_payload()["virtualInputs"]
900+
return self.obj_payload["virtualInputs"]
829901

830902
@property
831903
def raw_joins(self):
832904
"""
833905
Returns the raw list of joins
834906
:rtype list of dict
835907
"""
836-
return self.get_json_payload()["joins"]
908+
return self.obj_payload["joins"]
837909

838910
def add_virtual_input(self, input_dataset_index):
839911
"""
@@ -860,7 +932,7 @@ def add_join(self, join_type="LEFT", input1=0, input2=1):
860932
:returns the newly added join as a dict
861933
:rtype dict
862934
"""
863-
jp = self.get_json_payload()
935+
jp = self.obj_payload
864936
if not "joins" in jp:
865937
jp["joins"] = []
866938
join = {
@@ -893,10 +965,10 @@ def add_post_join_computed_column(self, computed_column):
893965
894966
Use :class:`dataikuapi.dss.utils.DSSComputedColumn` to build the computed_column object
895967
"""
896-
self.get_json_payload()["computedColumns"].append(computed_column)
968+
self.obj_payload["computedColumns"].append(computed_column)
897969

898970
def set_post_filter(self, postfilter):
899-
self.get_json_payload()["postFilter"] = postfilter
971+
self.obj_payload["postFilter"] = postfilter
900972

901973
class JoinRecipeCreator(VirtualInputsSingleOutputRecipeCreator):
902974
"""
@@ -979,7 +1051,7 @@ def get_code(self):
9791051
:rtype string
9801052
"""
9811053
self._payload_to_str()
982-
return self.str_payload
1054+
return self._str_payload
9831055

9841056
def set_code(self, code):
9851057
"""

0 commit comments

Comments
 (0)