Skip to content

Commit 0a8601c

Browse files
author
kgued
committed
Merge remote-tracking branch 'origin/feature/new-dataset-management-apis' into feature/new-dataset-management-apis-patch
2 parents 6a4b2df + e606010 commit 0a8601c

File tree

3 files changed

+134
-59
lines changed

3 files changed

+134
-59
lines changed

dataikuapi/dss/dataset.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -475,7 +475,11 @@ def new_code_recipe(self, type, code=None, recipe_name=None):
475475

476476
if recipe_name is None:
477477
recipe_name = "%s_recipe_from_%s" % (type, self.dataset_name)
478-
builder = recipe.CodeRecipeCreator(recipe_name, type, self.project)
478+
479+
if type == "python":
480+
builder = recipe.PythonRecipeCreator(recipe_name, self.project)
481+
else:
482+
builder = recipe.CodeRecipeCreator(recipe_name, type, self.project)
479483
builder.with_input(self.dataset_name)
480484
if code is not None:
481485
builder.with_script(code)

dataikuapi/dss/flow.py

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from .utils import AnyLoc
2-
from .recipe import DSSRecipeDefinitionAndPayload
2+
from .dataset import DSSDataset
3+
from .recipe import DSSRecipe, DSSRecipeDefinitionAndPayload
34
from .future import DSSFuture
45
import logging
56

@@ -8,6 +9,9 @@ def __init__(self, client, project):
89
self.client = client
910
self.project = project
1011

12+
def get_graph(self):
13+
data = self.client._perform_json("GET", "/projects/%s/flow/graph/" % (self.project.project_key))
14+
return DSSProjectFlowGraph(self, data)
1115

1216
def replace_input_computable(self, current_ref, new_ref, type="DATASET"):
1317
"""
@@ -81,6 +85,84 @@ def propagate_schema(self, dataset_name, rebuild=False, recipe_update_options={}
8185
return DSSFuture(self.client,update_future.get('jobId', None), update_future)
8286

8387

88+
class DSSProjectFlowGraph(object):
89+
90+
def __init__(self, flow, data):
91+
self.flow = flow
92+
self.data = data
93+
94+
def get_source_computables(self, as_type="dict"):
95+
"""
96+
Returns the list of source computables.
97+
:param as_type: How to return the source computables. Possible values are "dict" and "object"
98+
99+
:return: if as_type=dict, each computable is returned as a dict containing at least "ref" and "type".
100+
if as_type=object, each computable is returned as a :class:`dataikuapi.dss.dataset.DSSDataset`,
101+
:class:`dataikuapi.dss.managedfolder.DSSManagedFolder`,
102+
:class:`dataikuapi.dss.savedmodel.DSSSavedModel`, or streaming endpoint
103+
"""
104+
ret = []
105+
for computable in self.data["computables"].values():
106+
if len(computable["predecessors"]) == 0:
107+
ret.append(computable)
108+
return self._convert_computables_list(ret, as_type)
109+
110+
def get_source_datasets(self):
111+
"""
112+
Returns the list of source datasets for this project.
113+
:rtype list of :class:`dataikuapi.dss.dataset.DSSDataset`
114+
"""
115+
return [self._get_object_from_computable(x) for x in self.get_source_computables() if x["type"] == "COMPUTABLE_DATASET"]
116+
117+
def get_successor_recipes(self, node, as_type="name"):
118+
"""
119+
Returns a list of recipes that are a successor of a graph node
120+
121+
:param node: Either a name or :class:`dataikuapi.dss.dataset.DSSDataset` object
122+
:return if as_type="name", list of strings, recipe names
123+
else list of :class:`dataikuapi.dss.recipe.DSSRecipe`
124+
"""
125+
if isinstance(node, DSSDataset):
126+
node = node.dataset_name
127+
128+
computable = self.data["computables"].get(node, None)
129+
if computable is None:
130+
raise ValueError("Computable %s not found in Flow graph" % node)
131+
132+
names= computable["successors"]
133+
134+
if as_type == "object":
135+
return [DSSRecipe(self.flow.client, self.flow.project.project_key, x) for x in names]
136+
else:
137+
return names
138+
139+
def get_successor_computables(self, node, as_type="dict"):
140+
"""
141+
Returns a list of computables that are a successor of a given graph node
142+
Each computable is returned as a dict containing at least "ref" and "type"
143+
"""
144+
if isinstance(node, DSSRecipe):
145+
node = node.recipe_name
146+
runnable = self.data["runnables"].get(node, None)
147+
if runnable is None:
148+
raise ValueError("Runnable %s not found in Flow graph" % node)
149+
150+
computables = [self.data["computables"][x] for x in runnable["successors"]]
151+
return self._convert_computables_list(computables, as_type)
152+
153+
def _convert_computables_list(self, computables, as_type):
154+
if as_type == "object":
155+
return [self._get_object_from_computable(computable) for computable in computables]
156+
else:
157+
return computables
158+
159+
def _get_object_from_computable(self, computable):
160+
if computable["type"] == "COMPUTABLE_DATASET":
161+
return DSSDataset(self.flow.client, self.flow.project.project_key, computable["ref"])
162+
else:
163+
# TODO
164+
raise Exception("unsupported %s" % computable["type"])
165+
84166
class DSSFlowTool(object):
85167
"""
86168
Handle to interact with a flow tool

dataikuapi/dss/recipe.py

Lines changed: 46 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1053,80 +1053,69 @@ def _finish_creation_settings(self):
10531053
super(CodeRecipeCreator, self)._finish_creation_settings()
10541054
self.creation_settings['script'] = self.script
10551055

1056-
class FunctionCodeRecipeCreator(CodeRecipeCreator):
10571056

1058-
DEFAULT_RECIPE_CODE_TMPL = """
1059-
import dataiku
1060-
import json
1061-
from {module_name} import {fname}
1062-
1063-
input_ds_names = {input_ds_names}
1064-
input_ds_list = [dataiku.Dataset(name) for name in input_ds_names]
1057+
class PythonRecipeCreator(CodeRecipeCreator):
1058+
"""
1059+
Creates a Python recipe.
1060+
A Python recipe can be defined either by its complete code, like a normal Python recipe, or
1061+
by a function signature.
10651062
1066-
output_ds_names = {output_ds_names}
1067-
output_ds_list = [dataiku.Dataset(name) for name in output_ds_names]
1063+
When using a function, the function must take as arguments:
1064+
* A list of dataframes corresponding to the dataframes of the input datasets
1065+
* Optional named arguments corresponding to arguments passed to the creator
1066+
"""
10681067

1069-
params = json.loads({params_json})
1068+
def __init__(self, name, project):
1069+
DSSRecipeCreator.__init__(self, "python", name, project)
10701070

1071-
input_df_list = [ds.get_dataframe() for ds in input_ds_list]
1072-
fn_input = input_df_list if len(input_df_list) > 1 else input_df_list[0]
1071+
DEFAULT_RECIPE_CODE_TMPL = """
1072+
# This code is autogenerated by PythonRecipeCreator function mode
1073+
import dataiku, dataiku.recipe, json
1074+
from {module_name} import {fname}
1075+
input_datasets = dataiku.recipe.get_inputs_as_datasets()
1076+
output_datasets = dataiku.recipe.get_outputs_as_datasets()
1077+
params = json.loads('{params_json}')
10731078
1074-
output_df_list = {fname}({input_arg}, **params)
1079+
logging.info("Reading %d input datasets as dataframes" % len(input_datasets))
1080+
input_dataframes = [ds.get_dataframe() for ds in input_datasets]
10751081
1076-
if not isinstance(output_df_list, list):
1077-
output_df_list = [output_df_list]
1082+
logging.info("Calling user function {fname}")
1083+
function_input = input_dataframes if len(input_dataframes) > 1 else input_dataframes[0]
1084+
output_dataframes = {fname}(function_input, **params)
10781085
1079-
if not len(output_df_list) == len(output_ds_list):
1080-
raise Exception("Code function {fname}() returned items len: %d, \\
1081-
does not match expected recipe output len: %d" % (len(output_df_list), len(output_ds_list)))
1086+
if not isinstance(output_dataframes, list):
1087+
output_dataframes = [output_dataframes]
10821088
1083-
output = list(zip(output_ds_list, output_df_list))
1089+
if not len(output_dataframes) == len(output_datasets):
1090+
raise Exception("Code function {fname}() returned %d dataframes but recipe expects %d output datasets", \\
1091+
(len(output_dataframes), len(output_datasets)))
1092+
output = list(zip(output_datasets, output_dataframes))
10841093
for ds, df in output:
1094+
logging.info("Writing function result to dataset %s" % ds.name)
10851095
ds.write_with_schema(df)
1086-
10871096
"""
10881097

1089-
def __init__(self, name, type, project):
1090-
CodeRecipeCreator.__init__(self, name, type, project)
1098+
def with_function_name(self, module_name, function_name, function_args=None, custom_template=None):
1099+
"""
1100+
Defines this recipe as being a functional recipe calling a function name from a module name
1101+
"""
1102+
script_tmpl = PythonRecipeCreator.DEFAULT_RECIPE_CODE_TMPL if custom_template is None else custom_template
10911103

1092-
def with_function(self, fn, input_arg=None, code_template=None):
1104+
if function_args is None:
1105+
function_args = {}
10931106

1107+
code = script_tmpl.format(module_name=module_name, fname=function_name, params_json = json.dumps(function_args))
1108+
self.with_script(code)
1109+
1110+
return self
1111+
1112+
def with_function(self, fn, function_args=None, custom_template=None):
1113+
import inspect
10941114
#TODO: add in documentation that relative imports wont work
10951115
module_name = inspect.getmodule(fn).__name__
10961116
fname = fn.__name__
1097-
1098-
input_ds_names = self.get_input_refs_for_role(role="main")
1099-
output_ds_names = self.get_output_refs_for_role(role="main")
1100-
1101-
input_arg = 'fn_input' if not input_arg else '{}=fn_input'.format(input_arg)
1102-
1103-
script_tmpl = FunctionCodeRecipeCreator.DEFAULT_RECIPE_CODE_TMPL if code_template is None else code_template
1104-
1105-
def generate_code_fn(**kwargs):
1106-
code_tmpl = script_tmpl.format(
1107-
module_name=module_name,
1108-
fname=fname,
1109-
input_ds_names=repr(input_ds_names),
1110-
output_ds_names=repr(output_ds_names),
1111-
input_arg=input_arg,
1112-
params_json = '{params_json}'
1113-
)
1114-
1115-
#TODO: deal with default args
1116-
argspec = inspect.getargspec(fn)
1117-
for k in kwargs.keys():
1118-
if k not in argspec.args:
1119-
raise ValueError("Provided key argument {} not an argument of function {}".format(k, fname))
1120-
1121-
#params_string = ','.join(["{}='{}'".format(k,v) for k,v in kwargs.items()])
1122-
params_json = json.dumps(kwargs)
1123-
code = code_tmpl.format(params_json=repr(params_json))
1124-
1125-
self.with_script(code)
1126-
1127-
return self
1128-
1129-
return generate_code_fn
1117+
return self.with_function_name(module_name, fname, function_args, custom_template)
1118+
>>>>>>> origin/feature/new-dataset-management-apis
11301119

11311120
class SQLQueryRecipeCreator(SingleOutputRecipeCreator):
11321121
"""

0 commit comments

Comments
 (0)