Skip to content

Commit e308c20

Browse files
authored
Merge pull request #66 from dataiku/feature/new-dataset-management-apis
[WIP] Flow building APIs + new security APIs
2 parents e5bb196 + 154fc99 commit e308c20

File tree

8 files changed

+1508
-127
lines changed

8 files changed

+1508
-127
lines changed

dataikuapi/dss/dataset.py

Lines changed: 270 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,61 @@
11
from ..utils import DataikuException
22
from ..utils import DataikuUTF8CSVReader
33
from ..utils import DataikuStreamedHttpUTF8CSVReader
4-
import json
4+
from .future import DSSFuture
5+
import json, warnings
6+
from .utils import DSSTaggableObjectListItem
57
from .future import DSSFuture
68
from .metrics import ComputedMetrics
79
from .discussion import DSSObjectDiscussions
810
from .statistics import DSSStatisticsWorksheet
11+
from . import recipe
12+
13+
class DSSDatasetListItem(DSSTaggableObjectListItem):
14+
"""An item in a list of datasets. Do not instantiate this class"""
15+
def __init__(self, client, data):
16+
super(DSSDatasetListItem, self).__init__(data)
17+
self.client = client
18+
19+
def to_dataset(self):
20+
"""Gets the :class:`DSSDataset` corresponding to this dataset"""
21+
return DSSDataset(self.client, self._data["projectKey"], self._data["name"])
22+
23+
@property
24+
def name(self):
25+
return self._data["name"]
26+
@property
27+
def id(self):
28+
return self._data["name"]
29+
@property
30+
def type(self):
31+
return self._data["type"]
32+
@property
33+
def schema(self):
34+
return self._data["schema"]
35+
36+
@property
37+
def connection(self):
38+
"""Returns the connection on which this dataset is attached, or None if there is no connection for this dataset"""
39+
if not "params" in self._data:
40+
return None
41+
return self._data["params"].get("connection", None)
42+
43+
def get_column(self, column):
44+
"""
45+
Returns the schema column given a name.
46+
:param str column: Column to find
47+
:return a dict of the column settings or None if column does not exist
48+
"""
49+
matched = [col for col in self.schema["columns"] if col["name"] == column]
50+
return None if len(matched) == 0 else matched[0]
951

1052
class DSSDataset(object):
1153
"""
1254
A dataset on the DSS instance
1355
"""
1456
def __init__(self, client, project_key, dataset_name):
1557
self.client = client
58+
self.project = client.get_project(project_key)
1659
self.project_key = project_key
1760
self.dataset_name = dataset_name
1861

@@ -36,24 +79,55 @@ def delete(self, drop_data=False):
3679
# Dataset definition
3780
########################################################
3881

39-
def get_definition(self):
82+
def get_settings(self):
4083
"""
41-
Get the definition of the dataset
84+
Returns the settings of this dataset as a :class:`DSSDatasetSettings`, or one of its subclasses.
4285
43-
Returns:
44-
the definition, as a JSON object
86+
Know subclasses of :class:`DSSDatasetSettings` include :class:`FSLikeDatasetSettings`
87+
and :class:`SQLDatasetSettings`
88+
89+
You must use :meth:`~DSSDatasetSettings.save()` on the returned object to make your changes effective
90+
on the dataset.
91+
92+
.. code-block:: python
93+
94+
# Example: activating discrete partitioning on a SQL dataset
95+
dataset = project.get_dataset("my_database_table")
96+
settings = dataset.get_settings()
97+
settings.add_discrete_partitioning_dimension("country")
98+
settings.save()
99+
100+
:rtype: :class:`DSSDatasetSettings`
101+
"""
102+
data = self.client._perform_json("GET", "/projects/%s/datasets/%s" % (self.project_key, self.dataset_name))
103+
104+
if data["type"] in self.__class__.FS_TYPES:
105+
return FSLikeDatasetSettings(self, data)
106+
elif data["type"] in self.__class__.SQL_TYPES:
107+
return SQLDatasetSettings(self, data)
108+
else:
109+
return DSSDatasetSettings(self, data)
110+
111+
112+
def get_definition(self):
45113
"""
114+
Deprecated. Use :meth:`get_settings`
115+
Get the raw settings of the dataset as a dict
116+
:rtype: dict
117+
"""
118+
warnings.warn("Dataset.get_definition is deprecated, please use get_settings", DeprecationWarning)
46119
return self.client._perform_json(
47120
"GET", "/projects/%s/datasets/%s" % (self.project_key, self.dataset_name))
48121

49122
def set_definition(self, definition):
50123
"""
124+
Deprecated. Use :meth:`get_settings` and :meth:`DSSDatasetSettings.save`
51125
Set the definition of the dataset
52126
53-
Args:
54-
definition: the definition, as a JSON object. You should only set a definition object
55-
that has been retrieved using the get_definition call.
127+
:param definition: the definition, as a dict. You should only set a definition object
128+
that has been retrieved using the get_definition call.
56129
"""
130+
warnings.warn("Dataset.set_definition is deprecated, please use get_settings", DeprecationWarning)
57131
return self.client._perform_json(
58132
"PUT", "/projects/%s/datasets/%s" % (self.project_key, self.dataset_name),
59133
body=definition)
@@ -174,6 +248,30 @@ def copy_to(self, target, sync_schema=True, write_mode="OVERWRITE"):
174248
# Dataset actions
175249
########################################################
176250

251+
def build(self, job_type="NON_RECURSIVE_FORCED_BUILD", partitions=None, wait=True, no_fail=False):
252+
"""
253+
Starts a new job to build this dataset and wait for it to complete.
254+
Raises if the job failed.
255+
256+
.. code-block:: python
257+
258+
job = dataset.build()
259+
print("Job %s done" % job.id)
260+
261+
:param job_type: The job type. One of RECURSIVE_BUILD, NON_RECURSIVE_FORCED_BUILD or RECURSIVE_FORCED_BUILD
262+
:param partitions: If the dataset is partitioned, a list of partition ids to build
263+
:param no_fail: if True, does not raise if the job failed.
264+
:return: the :class:`dataikuapi.dss.job.DSSJob` job handle corresponding to the built job
265+
:rtype: :class:`dataikuapi.dss.job.DSSJob`
266+
"""
267+
jd = self.project.new_job(job_type)
268+
jd.with_output(self.dataset_name, partition=partitions)
269+
if wait:
270+
return jd.start_and_wait()
271+
else:
272+
return jd.start()
273+
274+
177275
def synchronize_hive_metastore(self):
178276
"""
179277
Synchronize this dataset with the Hive metastore
@@ -346,6 +444,170 @@ def get_object_discussions(self):
346444
"""
347445
return DSSObjectDiscussions(self.client, self.project_key, "DATASET", self.dataset_name)
348446

447+
########################################################
448+
# Test / Autofill
449+
########################################################
450+
451+
FS_TYPES = ["Filesystem", "UploadedFiles", "FilesInFolder",
452+
"HDFS", "S3", "Azure", "GCS", "FTP", "SCP", "SFTP"]
453+
# HTTP is FSLike but not FS
454+
455+
SQL_TYPES = ["JDBC", "PostgreSQL", "MySQL", "Vertica", "Snowflake", "Redshift",
456+
"Greenplum", "Teradata", "Oracle", "SQLServer", "SAPHANA", "Netezza",
457+
"BigQuery", "Athena", "hiveserver2"]
458+
459+
def test_and_detect(self, infer_storage_types=False):
460+
settings = self.get_settings()
461+
462+
if settings.type in self.__class__.FS_TYPES:
463+
future_resp = self.client._perform_json("POST",
464+
"/projects/%s/datasets/%s/actions/testAndDetectSettings/fsLike"% (self.project_key, self.dataset_name),
465+
body = {"detectPossibleFormats" : True, "inferStorageTypes" : infer_storage_types })
466+
467+
return DSSFuture(self.client, future_resp.get('jobId', None), future_resp)
468+
elif settings.type in self.__class__.SQL_TYPES:
469+
return self.client._perform_json("POST",
470+
"/projects/%s/datasets/%s/actions/testAndDetectSettings/externalSQL"% (self.project_key, self.dataset_name))
471+
else:
472+
raise ValueError("don't know how to test/detect on dataset type:%s" % settings.type)
473+
474+
def autodetect_settings(self, infer_storage_types=False):
475+
settings = self.get_settings()
476+
477+
if settings.type in self.__class__.FS_TYPES:
478+
future = self.test_and_detect(infer_storage_types)
479+
result = future.wait_for_result()
480+
481+
if not "format" in result or not result["format"]["ok"]:
482+
raise DataikuException("Format detection failed, complete response is " + json.dumps(result))
483+
484+
settings.get_raw()["formatType"] = result["format"]["type"]
485+
settings.get_raw()["formatParams"] = result["format"]["params"]
486+
settings.get_raw()["schema"] = result["format"]["schemaDetection"]["newSchema"]
487+
488+
return settings
489+
490+
elif settings.type in self.__class__.SQL_TYPES:
491+
result = self.test_and_detect()
492+
493+
if not "schemaDetection" in result:
494+
raise DataikuException("Format detection failed, complete response is " + json.dumps(result))
495+
496+
settings.get_raw()["schema"] = result["schemaDetection"]["newSchema"]
497+
return settings
498+
499+
else:
500+
raise ValueError("don't know how to test/detect on dataset type:%s" % settings.type)
501+
502+
def get_as_core_dataset(self):
503+
import dataiku
504+
return dataiku.Dataset("%s.%s" % (self.project_key, self.dataset_name))
505+
506+
########################################################
507+
# Creation of recipes
508+
########################################################
509+
510+
def new_code_recipe(self, type, code=None, recipe_name=None):
511+
"""Starts creation of a new code recipe taking this dataset as input"""
512+
513+
if recipe_name is None:
514+
recipe_name = "%s_recipe_from_%s" % (type, self.dataset_name)
515+
516+
if type == "python":
517+
builder = recipe.PythonRecipeCreator(recipe_name, self.project)
518+
else:
519+
builder = recipe.CodeRecipeCreator(recipe_name, type, self.project)
520+
builder.with_input(self.dataset_name)
521+
if code is not None:
522+
builder.with_script(code)
523+
return builder
524+
525+
def new_grouping_recipe(self, first_group_by, recipe_name=None):
526+
if recipe_name is None:
527+
recipe_name = "group_%s" % (self.dataset_name)
528+
builder = recipe.GroupingRecipeCreator(recipe_name, self.project)
529+
builder.with_input(self.dataset_name)
530+
builder.with_group_key(first_group_by)
531+
return builder
532+
533+
class DSSDatasetSettings(object):
534+
def __init__(self, dataset, settings):
535+
self.dataset = dataset
536+
self.settings = settings
537+
538+
def get_raw(self):
539+
"""Get the raw dataset settings as a dict"""
540+
return self.settings
541+
542+
def get_raw_params(self):
543+
"""Get the type-specific params, as a raw dict"""
544+
return self.settings["params"]
545+
546+
@property
547+
def type(self):
548+
return self.settings["type"]
549+
550+
def remove_partitioning(self):
551+
self.settings["partitioning"] = {"dimensions" : []}
552+
553+
def add_discrete_partitioning_dimension(self, dim_name):
554+
self.settings["partitioning"]["dimensions"].append({"name": dim_name, "type": "value"})
555+
556+
def add_time_partitioning_dimension(self, dim_name, period="DAY"):
557+
self.settings["partitioning"]["dimensions"].append({"name": dim_name, "type": "time", "params":{"period": period}})
558+
559+
def add_raw_schema_column(self, column):
560+
self.settings["schema"]["columns"].append(column)
561+
562+
def save(self):
563+
self.dataset.client._perform_empty(
564+
"PUT", "/projects/%s/datasets/%s" % (self.dataset.project_key, self.dataset.dataset_name),
565+
body=self.settings)
566+
567+
class FSLikeDatasetSettings(DSSDatasetSettings):
568+
def __init__(self, dataset, settings):
569+
super(FSLikeDatasetSettings, self).__init__(dataset, settings)
570+
571+
def set_connection_and_path(self, connection, path):
572+
self.settings["params"]["connection"] = connection
573+
self.settings["params"]["path"] = path
574+
575+
def get_raw_format_params(self):
576+
"""Get the raw format parameters as a dict"""
577+
return self.settings["formatParams"]
578+
579+
def set_format(self, format_type, format_params = None):
580+
if format_params is None:
581+
format_params = {}
582+
self.settings["formatType"] = format_type
583+
self.settings["formatParams"] = format_params
584+
585+
def set_csv_format(self, separator=",", style="excel", skip_rows_before=0, header_row=True, skip_rows_after=0):
586+
format_params = {
587+
"style" : style,
588+
"separator": separator,
589+
"skipRowsBeforeHeader": skip_rows_before,
590+
"parseHeaderRow": header_row,
591+
"skipRowsAfterHeader": skip_rows_after
592+
}
593+
self.set_format("csv", format_params)
594+
595+
def set_partitioning_file_pattern(self, pattern):
596+
self.settings["partitioning"]["filePathPattern"] = pattern
597+
598+
class SQLDatasetSettings(DSSDatasetSettings):
599+
def __init__(self, dataset, settings):
600+
super(SQLDatasetSettings, self).__init__(dataset, settings)
601+
602+
def set_table(self, connection, schema, table):
603+
"""Sets this SQL dataset in 'table' mode, targeting a particular table of a connection"""
604+
self.settings["params"].update({
605+
"connection": connection,
606+
"mode": "table",
607+
"schema": schema,
608+
"table": table
609+
})
610+
349611
class DSSManagedDatasetCreationHelper(object):
350612

351613
def __init__(self, project, dataset_name):

0 commit comments

Comments
 (0)