Skip to content

Commit e606010

Browse files
committed
Bootstrap a new API to easily explore the Flow graph
1 parent 0ad9cd1 commit e606010

File tree

1 file changed

+83
-1
lines changed

1 file changed

+83
-1
lines changed

dataikuapi/dss/flow.py

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from .utils import AnyLoc
2-
from .recipe import DSSRecipeDefinitionAndPayload
2+
from .dataset import DSSDataset
3+
from .recipe import DSSRecipe, DSSRecipeDefinitionAndPayload
34
from .future import DSSFuture
45
import logging
56

@@ -8,6 +9,9 @@ def __init__(self, client, project):
89
self.client = client
910
self.project = project
1011

12+
def get_graph(self):
13+
data = self.client._perform_json("GET", "/projects/%s/flow/graph/" % (self.project.project_key))
14+
return DSSProjectFlowGraph(self, data)
1115

1216
def replace_input_computable(self, current_ref, new_ref, type="DATASET"):
1317
"""
@@ -81,6 +85,84 @@ def propagate_schema(self, dataset_name, rebuild=False, recipe_update_options={}
8185
return DSSFuture(self.client,update_future.get('jobId', None), update_future)
8286

8387

88+
class DSSProjectFlowGraph(object):
89+
90+
def __init__(self, flow, data):
91+
self.flow = flow
92+
self.data = data
93+
94+
def get_source_computables(self, as_type="dict"):
95+
"""
96+
Returns the list of source computables.
97+
:param as_type: How to return the source computables. Possible values are "dict" and "object"
98+
99+
:return: if as_type=dict, each computable is returned as a dict containing at least "ref" and "type".
100+
if as_type=object, each computable is returned as a :class:`dataikuapi.dss.dataset.DSSDataset`,
101+
:class:`dataikuapi.dss.managedfolder.DSSManagedFolder`,
102+
:class:`dataikuapi.dss.savedmodel.DSSSavedModel`, or streaming endpoint
103+
"""
104+
ret = []
105+
for computable in self.data["computables"].values():
106+
if len(computable["predecessors"]) == 0:
107+
ret.append(computable)
108+
return self._convert_computables_list(ret, as_type)
109+
110+
def get_source_datasets(self):
111+
"""
112+
Returns the list of source datasets for this project.
113+
:rtype list of :class:`dataikuapi.dss.dataset.DSSDataset`
114+
"""
115+
return [self._get_object_from_computable(x) for x in self.get_source_computables() if x["type"] == "COMPUTABLE_DATASET"]
116+
117+
def get_successor_recipes(self, node, as_type="name"):
118+
"""
119+
Returns a list of recipes that are a successor of a graph node
120+
121+
:param node: Either a name or :class:`dataikuapi.dss.dataset.DSSDataset` object
122+
:return if as_type="name", list of strings, recipe names
123+
else list of :class:`dataikuapi.dss.recipe.DSSRecipe`
124+
"""
125+
if isinstance(node, DSSDataset):
126+
node = node.dataset_name
127+
128+
computable = self.data["computables"].get(node, None)
129+
if computable is None:
130+
raise ValueError("Computable %s not found in Flow graph" % node)
131+
132+
names= computable["successors"]
133+
134+
if as_type == "object":
135+
return [DSSRecipe(self.flow.client, self.flow.project.project_key, x) for x in names]
136+
else:
137+
return names
138+
139+
def get_successor_computables(self, node, as_type="dict"):
140+
"""
141+
Returns a list of computables that are a successor of a given graph node
142+
Each computable is returned as a dict containing at least "ref" and "type"
143+
"""
144+
if isinstance(node, DSSRecipe):
145+
node = node.recipe_name
146+
runnable = self.data["runnables"].get(node, None)
147+
if runnable is None:
148+
raise ValueError("Runnable %s not found in Flow graph" % node)
149+
150+
computables = [self.data["computables"][x] for x in runnable["successors"]]
151+
return self._convert_computables_list(computables, as_type)
152+
153+
def _convert_computables_list(self, computables, as_type):
154+
if as_type == "object":
155+
return [self._get_object_from_computable(computable) for computable in computables]
156+
else:
157+
return computables
158+
159+
def _get_object_from_computable(self, computable):
160+
if computable["type"] == "COMPUTABLE_DATASET":
161+
return DSSDataset(self.flow.client, self.flow.project.project_key, computable["ref"])
162+
else:
163+
# TODO
164+
raise Exception("unsupported %s" % computable["type"])
165+
84166
class DSSFlowTool(object):
85167
"""
86168
Handle to interact with a flow tool

0 commit comments

Comments
 (0)