Skip to content

Commit 46eb039

Browse files
authored
Merge pull request #58 from dataiku/feature/project-apps
[WIP] Dataiku Apps API draft
2 parents 56859e6 + 34b1b49 commit 46eb039

File tree

8 files changed

+436
-1
lines changed

8 files changed

+436
-1
lines changed

dataikuapi/dss/app.py

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
import sys
2+
import os.path as osp
3+
from .future import DSSFuture
4+
from dataikuapi.utils import DataikuException
5+
import random, string
6+
7+
def random_string(length):
8+
return ''.join(random.choice(string.ascii_letters) for _ in range(length))
9+
10+
class DSSApp(object):
11+
"""
12+
A handle to interact with a app on the DSS instance.
13+
Do not create this class directly, instead use :meth:`dataikuapi.DSSClient.get_app``
14+
"""
15+
def __init__(self, client, app_id):
16+
self.client = client
17+
self.app_id = app_id
18+
19+
########################################################
20+
# Instances
21+
########################################################
22+
23+
def create_instance(self, instance_key, instance_name, wait=True):
24+
"""
25+
Creates a new instance of this app. Each instance. must have a globally unique
26+
instance key, separate from any project key across the whole DSS instance
27+
28+
:return:
29+
"""
30+
future_resp = self.client._perform_json(
31+
"POST", "/apps/%s/instances" % self.app_id, body={
32+
"targetProjectKey" : instance_key,
33+
"targetProjectName" : instance_name
34+
})
35+
future = DSSFuture(self.client, future_resp.get("jobId", None), future_resp)
36+
if wait:
37+
result = future.wait_for_result()
38+
return DSSAppInstance(self.client, instance_key)
39+
else:
40+
return future
41+
42+
def make_random_project_key(self):
43+
return "%s_tmp_%s" % (self.app_id, random_string(10))
44+
45+
def create_temporary_instance(self):
46+
"""
47+
Creates a new temporary instance of this app.
48+
The return value should be used as a Python context manager. Upon exit, the temporary app
49+
instance is deleted
50+
:return a :class:`TemporaryDSSAppInstance`
51+
"""
52+
key = self.make_random_project_key()
53+
instance = self.create_instance(key, key, True)
54+
return TemporaryDSSAppInstance(self.client, key)
55+
56+
def list_instance_keys(self):
57+
"""
58+
List the existing instances of this app
59+
60+
:return a list of instance keys, each as a string
61+
"""
62+
return [x["projectKey"] for x in self.list_instances()]
63+
64+
def list_instances(self):
65+
"""
66+
List the existing instances of this app
67+
68+
:rtype: list of dicts
69+
:return a list of instances, each as a dict containing at least a "projectKey" field
70+
"""
71+
return self.client._perform_json(
72+
"GET", "/apps/%s/instances/" % self.app_id)
73+
74+
def get_instance(self, instance_key):
75+
return DSSAppInstance(self.client, instance_key)
76+
77+
78+
def get_manifest(self):
79+
raw_data = self.client._perform_json("GET", "/apps/%s/" % self.app_id)
80+
return DSSAppManifest(self.client, raw_data)
81+
82+
83+
class DSSAppManifest(object):
84+
85+
def __init__(self, client, raw_data):
86+
"""The manifest for an app. Do not create this class directly"""
87+
self.client = client
88+
self.raw_data = raw_data
89+
90+
def get_raw(self):
91+
return self.raw_data
92+
93+
def get_all_actions(self):
94+
return [x for section in self.raw_data["homepageSections"] for x in section["tiles"]]
95+
96+
def get_runnable_scenarios(self):
97+
"""Return the scenario identifiers that are declared as actions for this app"""
98+
return [x["scenarioId"] for x in self.get_all_actions() if x["type"] == "SCENARIO_RUN"]
99+
100+
class DSSAppInstance(object):
101+
102+
def __init__(self, client, project_key):
103+
self.client = client
104+
self.project_key = project_key
105+
106+
def get_as_project(self):
107+
"""
108+
Get the :class:`dataikuapi.dss.project DSSProject` corresponding to this app instance
109+
"""
110+
return self.client.get_project(self.project_key)
111+
112+
def get_manifest(self):
113+
"""
114+
Get the app manifest for this instance, as a :class:`DSSAppManifest`
115+
"""
116+
raw_data = self.client._perform_json("GET", "/projects/%s/app-manifest" % self.project_key)
117+
return DSSAppManifest(self.client, raw_data)
118+
119+
class TemporaryDSSAppInstance(DSSAppInstance):
120+
"""internal class"""
121+
122+
def __init__(self, client, project_key):
123+
DSSAppInstance.__init__(self, client,project_key)
124+
125+
126+
def close(self):
127+
self.get_as_project().delete()
128+
129+
def __enter__(self,):
130+
return self
131+
132+
def __exit__(self, type, value, traceback):
133+
self.close()

dataikuapi/dss/dataset.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from ..utils import DataikuUTF8CSVReader
33
from ..utils import DataikuStreamedHttpUTF8CSVReader
44
import json
5+
from .future import DSSFuture
56
from .metrics import ComputedMetrics
67
from .discussion import DSSObjectDiscussions
78
from .statistics import DSSStatisticsWorksheet
@@ -153,6 +154,22 @@ def clear(self, partitions=None):
153154
"DELETE", "/projects/%s/datasets/%s/data" % (self.project_key, self.dataset_name),
154155
params={"partitions" : partitions})
155156

157+
def copy_to(self, target, sync_schema=True, write_mode="OVERWRITE"):
158+
"""
159+
Copies the data of this dataset to another dataset
160+
161+
:param target Dataset: a :class:`dataikuapi.dss.dataset.DSSDataset` representing the target of this copy
162+
:returns: a DSSFuture representing the operation
163+
"""
164+
dqr = {
165+
"targetProjectKey" : target.project_key,
166+
"targetDatasetName": target.dataset_name,
167+
"syncSchema": sync_schema,
168+
"writeMode" : write_mode
169+
}
170+
future_resp = self.client._perform_json("POST", "/projects/%s/datasets/%s/actions/copyTo" % (self.project_key, self.dataset_name), body=dqr)
171+
return DSSFuture(self.client, future_resp.get("jobId", None), future_resp)
172+
156173
########################################################
157174
# Dataset actions
158175
########################################################
@@ -205,6 +222,23 @@ def run_checks(self, partition='', checks=None):
205222
"POST" , "/projects/%s/datasets/%s/actions/runChecks" %(self.project_key, self.dataset_name),
206223
params={'partition':partition}, body=checks)
207224

225+
def uploaded_add_file(self, fp, filename):
226+
"""
227+
Adds a file to an "uploaded files" dataset
228+
229+
:param file fp: A file-like object that represents the file to upload
230+
:param str filename: The filename for the file to upload
231+
"""
232+
self.client._perform_empty("POST", "/projects/%s/datasets/%s/uploaded/files" % (self.project_key, self.dataset_name),
233+
files={"file":(filename, fp)})
234+
235+
def uploaded_list_files(self):
236+
"""
237+
List the files in an "uploaded files" dataset
238+
"""
239+
return self.client._perform_json("GET", "/projects/%s/datasets/%s/uploaded/files" % (self.project_key, self.dataset_name))
240+
241+
208242
########################################################
209243
# Statistics worksheets
210244
########################################################

dataikuapi/dss/flow.py

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
from .utils import AnyLoc
2+
from .recipe import DSSRecipeDefinitionAndPayload
3+
from .future import DSSFuture
4+
import logging
5+
6+
class DSSProjectFlow(object):
7+
def __init__(self, client, project):
8+
self.client = client
9+
self.project = project
10+
11+
12+
def replace_input_computable(self, current_ref, new_ref, type="DATASET"):
13+
"""
14+
This method replaces all references to a "computable" (Dataset, Managed Folder or Saved Model)
15+
as input of recipes in the whole Flow by a reference to another computable.
16+
17+
No specific checks are performed. It is your responsibility to ensure that the schema
18+
of the new dataset will be compatible with the previous one (in the case of datasets).
19+
20+
If `new_ref` references an object in a foreign project, this method will automatically
21+
ensure that `new_ref` is exposed to the current project
22+
23+
:param current_ref str: Either a "simple" object name (dataset name, model id, managed folder id)
24+
or a foreign object reference in the form "FOREIGN_PROJECT_KEY.local_id")
25+
:param new_ref str: Either a "simple" object name (dataset name, model id, managed folder id)
26+
or a foreign object reference in the form "FOREIGN_PROJECT_KEY.local_id")
27+
:param type str: The type of object being replaced (DATASET, SAVED_MODEL or MANAGED_FOLDER)
28+
"""
29+
30+
new_loc = AnyLoc.from_ref(self.project.project_key, new_ref)
31+
32+
if new_loc.project_key != self.project.project_key:
33+
logging.info("New ref is in project %s, exposing it to project %s" % (new_loc.project_key, self.project.project_key))
34+
new_ref_src_project = self.client.get_project(new_loc.project_key)
35+
settings = new_ref_src_project.get_settings()
36+
settings.add_exposed_object(type, new_loc.object_id, self.project.project_key)
37+
settings.save()
38+
39+
for recipe in self.project.list_recipes():
40+
fake_rap = DSSRecipeDefinitionAndPayload({"recipe" : recipe})
41+
if fake_rap.has_input(current_ref):
42+
logging.info("Recipe %s has %s as input, performing the replacement by %s"% \
43+
(recipe["name"], current_ref, new_ref))
44+
recipe_obj = self.project.get_recipe(recipe["name"])
45+
dap = recipe_obj.get_definition_and_payload()
46+
dap.replace_input(current_ref, new_ref)
47+
recipe_obj.set_definition_and_payload(dap)
48+
49+
########################################################
50+
# Flow tools
51+
########################################################
52+
53+
def start_tool(self, type, data={}):
54+
"""
55+
Start a tool or open a view in the flow
56+
57+
:param type str: one of {COPY, CHECK_CONSISTENCY, PROPAGATE_SCHEMA} (tools) or {TAGS, CUSTOM_FIELDS, CONNECTIONS, COUNT_OF_RECORDS, FILESIZE, FILEFORMATS, RECIPES_ENGINES, RECIPES_CODE_ENVS, IMPALA_WRITE_MODE, HIVE_MODE, SPARK_ENGINE, SPARK_CONFIG, SPARK_PIPELINES, SQL_PIPELINES, PARTITIONING, PARTITIONS, SCENARIOS, CREATION, LAST_MODIFICATION, LAST_BUILD, RECENT_ACTIVITY, WATCH} (views)
58+
:param data dict: initial data for the tool (optional)
59+
60+
:returns: a :class:`.flow.DSSFlowTool` handle to interact with the newly-created tool or view
61+
"""
62+
tool_id = self.client._perform_text("POST", "/projects/%s/flow/tools/" % self.project.project_key, params={'type':type}, body=data)
63+
return DSSFlowTool(self.client, self.project.project_key, tool_id)
64+
65+
def propagate_schema(self, dataset_name, rebuild=False, recipe_update_options={}, excluded_recipes=[], mark_as_ok_recipes=[], partition_by_dim={}, partition_by_computable={}):
66+
"""
67+
Start an automatic schema propagate from a dataset
68+
69+
:param dataset_name str: name of a dataset to start propagating from
70+
:param rebuild bool: whether to automatically rebuild datasets if needed
71+
:param recipe_update_options str: pre-recipe or per-recipe-type update options to apply on recipes
72+
:param excluded_recipes list: list of recipes where propagation is to stop
73+
:param mark_as_ok_recipes list: list of recipes to consider as ok
74+
:param partition_by_dim dict: partition value to use for each dimension
75+
:param partition_by_computable dict: partition spec to use for a given dataset or recipe (overrides partition_by_dim)
76+
77+
:returns: dict of the messages collected during the update
78+
"""
79+
data = {'recipeUpdateOptions':recipe_update_options, 'partitionByDim':partition_by_dim, 'partitionByComputable':partition_by_computable, 'excludedRecipes':excluded_recipes, 'markAsOkRecipes':mark_as_ok_recipes}
80+
update_future = self.client._perform_json("POST", "/projects/%s/flow/tools/propagate-schema/%s/" % (self.project.project_key, dataset_name), params={'rebuild':rebuild}, body=data)
81+
return DSSFuture(self.client,update_future.get('jobId', None), update_future)
82+
83+
84+
class DSSFlowTool(object):
85+
"""
86+
Handle to interact with a flow tool
87+
"""
88+
def __init__(self, client, project_key, tool_id):
89+
self.client = client
90+
self.project_key = project_key
91+
self.tool_id = tool_id
92+
93+
def stop(self):
94+
"""
95+
Stops the tool and releases the resources held by it
96+
"""
97+
return self.client._perform_json("POST", "/projects/%s/flow/tools/%s/stop" % (self.project_key, self.tool_id))
98+
99+
def get_state(self, options={}):
100+
"""
101+
Get the current state of the tool or view
102+
103+
:returns: the state, as a dict
104+
"""
105+
return self.client._perform_json("GET", "/projects/%s/flow/tools/%s/state" % (self.project_key, self.tool_id), body=options)
106+
107+
def do(self, action):
108+
"""
109+
Perform a manual user action on the tool
110+
111+
:returns: the current state, as a dict
112+
"""
113+
return self.client._perform_json("PUT", "/projects/%s/flow/tools/%s/action" % (self.project_key, self.tool_id), body=action)
114+
115+
def update(self, options={}):
116+
"""
117+
(for tools only) Start updating the tool state
118+
119+
:params options dict: options for the update (optional)
120+
121+
:returns: a :class:`.future.DSSFuture` handle to interact with task of performing the update
122+
"""
123+
update_future = self.client._perform_json("POST", "/projects/%s/flow/tools/%s/update" % (self.project_key, self.tool_id), body=options)
124+
return DSSFuture(self.client,update_future.get('jobId', None), update_future)
125+

dataikuapi/dss/managedfolder.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from ..utils import DataikuStreamedHttpUTF8CSVReader
44
import json
55
from .metrics import ComputedMetrics
6+
from .future import DSSFuture
67
from .discussion import DSSObjectDiscussions
78

89
class DSSManagedFolder(object):
@@ -173,3 +174,22 @@ def get_object_discussions(self):
173174
:rtype: :class:`dataikuapi.discussion.DSSObjectDiscussions`
174175
"""
175176
return DSSObjectDiscussions(self.client, self.project_key, "MANAGED_FOLDER", self.odb_id)
177+
178+
########################################################
179+
# utilities
180+
########################################################
181+
def copy_to(self, target, write_mode="OVERWRITE"):
182+
"""
183+
Copies the data of this folder to another folder
184+
185+
:param target Folder: a :class:`dataikuapi.dss.managedfolder.DSSManagedFolder` representing the target of this copy
186+
:returns: a DSSFuture representing the operation
187+
"""
188+
dqr = {
189+
"targetProjectKey" : target.project_key,
190+
"targetFolderId": target.odb_id,
191+
"writeMode" : write_mode
192+
}
193+
future_resp = self.client._perform_json("POST", "/projects/%s/managedfolders/%s/actions/copyTo" % (self.project_key, self.odb_id), body=dqr)
194+
return DSSFuture(self.client, future_resp.get("jobId", None), future_resp)
195+

0 commit comments

Comments
 (0)