33from ..utils import DataikuStreamedHttpUTF8CSVReader
44from .future import DSSFuture
55import json , warnings
6+ from .utils import DSSTaggableObjectListItem
67from .future import DSSFuture
78from .metrics import ComputedMetrics
89from .discussion import DSSObjectDiscussions
910from .statistics import DSSStatisticsWorksheet
1011from . import recipe
1112
13+ class DSSDatasetListItem (DSSTaggableObjectListItem ):
14+ """An item in a list of datasets. Do not instantiate this class"""
15+ def __init__ (self , client , data ):
16+ super (DSSDatasetListItem , self ).__init__ (data )
17+ self .client = client
18+
19+ def to_dataset (self ):
20+ """Gets the :class:`DSSDataset` corresponding to this dataset"""
21+ return DSSDataset (self .client , self ._data ["projectKey" ], self ._data ["name" ])
22+
23+ @property
24+ def name (self ):
25+ return self ._data ["name" ]
26+ @property
27+ def id (self ):
28+ return self ._data ["name" ]
29+ @property
30+ def type (self ):
31+ return self ._data ["type" ]
32+ @property
33+ def schema (self ):
34+ return self ._data ["schema" ]
35+
36+ @property
37+ def connection (self ):
38+ """Returns the connection on which this dataset is attached, or None if there is no connection for this dataset"""
39+ if not "params" in self ._data :
40+ return None
41+ return self ._data ["params" ].get ("connection" , None )
42+
43+ def get_column (self , column ):
44+ """
45+ Returns the schema column given a name.
46+ :param str column: Column to find
47+ :return a dict of the column settings or None if column does not exist
48+ """
49+ matched = [col for col in self .schema ["columns" ] if col ["name" ] == column ]
50+ return None if len (matched ) == 0 else matched [0 ]
51+
1252class DSSDataset (object ):
1353 """
1454 A dataset on the DSS instance
@@ -96,6 +136,14 @@ def set_definition(self, definition):
96136 "PUT" , "/projects/%s/datasets/%s" % (self .project_key , self .dataset_name ),
97137 body = definition )
98138
139+ def exists (self ):
140+ """Returns whether this dataset exists"""
141+ try :
142+ self .get_metadata ()
143+ return True
144+ except Exception as e :
145+ return False
146+
99147 ########################################################
100148 # Dataset metadata
101149 ########################################################
@@ -300,6 +348,67 @@ def uploaded_list_files(self):
300348 """
301349 return self .client ._perform_json ("GET" , "/projects/%s/datasets/%s/uploaded/files" % (self .project_key , self .dataset_name ))
302350
351+ ########################################################
352+ # Lab and ML
353+ # Don't forget to synchronize with DSSProject.*
354+ ########################################################
355+
356+ def create_prediction_ml_task (self , target_variable ,
357+ ml_backend_type = "PY_MEMORY" ,
358+ guess_policy = "DEFAULT" ,
359+ prediction_type = None ,
360+ wait_guess_complete = True ):
361+
362+ """Creates a new prediction task in a new visual analysis lab
363+ for a dataset.
364+
365+ :param string input_dataset: the dataset to use for training/testing the model
366+ :param string target_variable: the variable to predict
367+ :param string ml_backend_type: ML backend to use, one of PY_MEMORY, MLLIB or H2O
368+ :param string guess_policy: Policy to use for setting the default parameters. Valid values are: DEFAULT, SIMPLE_FORMULA, DECISION_TREE, EXPLANATORY and PERFORMANCE
369+ :param string prediction_type: The type of prediction problem this is. If not provided the prediction type will be guessed. Valid values are: BINARY_CLASSIFICATION, REGRESSION, MULTICLASS
370+ :param boolean wait_guess_complete: if False, the returned ML task will be in 'guessing' state, i.e. analyzing the input dataset to determine feature handling and algorithms.
371+ You should wait for the guessing to be completed by calling
372+ ``wait_guess_complete`` on the returned object before doing anything
373+ else (in particular calling ``train`` or ``get_settings``)
374+ """
375+ return self .project .create_prediction_ml_task (self .dataset_name ,
376+ target_variable = target_variable , ml_backend_type = ml_backend_type ,
377+ guess_policy = guess_policy , prediction_type = prediction_type , wait_guess_complete = wait_guess_complete )
378+
379+ def create_clustering_ml_task (self , input_dataset ,
380+ ml_backend_type = "PY_MEMORY" ,
381+ guess_policy = "KMEANS" ):
382+ """Creates a new clustering task in a new visual analysis lab
383+ for a dataset.
384+
385+
386+ The returned ML task will be in 'guessing' state, i.e. analyzing
387+ the input dataset to determine feature handling and algorithms.
388+
389+ You should wait for the guessing to be completed by calling
390+ ``wait_guess_complete`` on the returned object before doing anything
391+ else (in particular calling ``train`` or ``get_settings``)
392+
393+ :param string ml_backend_type: ML backend to use, one of PY_MEMORY, MLLIB or H2O
394+ :param string guess_policy: Policy to use for setting the default parameters. Valid values are: KMEANS and ANOMALY_DETECTION
395+ """
396+ return self .project .create_clustering_ml_task (self .dataset_name ,
397+ ml_backend_type = ml_backend_type , guess_policy = guess_policy )
398+
399+ def create_analysis (self ):
400+ """
401+ Creates a new visual analysis lab
402+ """
403+ return self .project_create_analysis (self .dataset_name )
404+
405+ def list_analyses (self ):
406+ """
407+ List the visual analyses on this dataset
408+ :return list of dicts
409+ """
410+ analysis_list = self .project .list_analyses ()
411+ return [desc for desc in analysis_list if self .dataset_name == desc .get ('inputDataset' )]
303412
304413 ########################################################
305414 # Statistics worksheets
@@ -414,6 +523,7 @@ def get_object_discussions(self):
414523
415524 FS_TYPES = ["Filesystem" , "UploadedFiles" , "FilesInFolder" ,
416525 "HDFS" , "S3" , "Azure" , "GCS" , "FTP" , "SCP" , "SFTP" ]
526+ # HTTP is FSLike but not FS
417527
418528 SQL_TYPES = ["JDBC" , "PostgreSQL" , "MySQL" , "Vertica" , "Snowflake" , "Redshift" ,
419529 "Greenplum" , "Teradata" , "Oracle" , "SQLServer" , "SAPHANA" , "Netezza" ,
@@ -422,22 +532,22 @@ def get_object_discussions(self):
422532 def test_and_detect (self , infer_storage_types = False ):
423533 settings = self .get_settings ()
424534
425- if settings .get_type () in self .__class__ .FS_TYPES :
535+ if settings .type in self .__class__ .FS_TYPES :
426536 future_resp = self .client ._perform_json ("POST" ,
427537 "/projects/%s/datasets/%s/actions/testAndDetectSettings/fsLike" % (self .project_key , self .dataset_name ),
428538 body = {"detectPossibleFormats" : True , "inferStorageTypes" : infer_storage_types })
429539
430540 return DSSFuture (self .client , future_resp .get ('jobId' , None ), future_resp )
431- elif settings .get_type () in self .__class__ .SQL_TYPES :
541+ elif settings .type in self .__class__ .SQL_TYPES :
432542 return self .client ._perform_json ("POST" ,
433543 "/projects/%s/datasets/%s/actions/testAndDetectSettings/externalSQL" % (self .project_key , self .dataset_name ))
434544 else :
435- raise ValueError ("don't know how to test/detect on dataset type:%s" % settings .get_type () )
545+ raise ValueError ("don't know how to test/detect on dataset type:%s" % settings .type )
436546
437547 def autodetect_settings (self , infer_storage_types = False ):
438548 settings = self .get_settings ()
439549
440- if settings .get_type () in self .__class__ .FS_TYPES :
550+ if settings .type in self .__class__ .FS_TYPES :
441551 future = self .test_and_detect (infer_storage_types )
442552 result = future .wait_for_result ()
443553
@@ -450,7 +560,7 @@ def autodetect_settings(self, infer_storage_types=False):
450560
451561 return settings
452562
453- elif settings .get_type () in self .__class__ .SQL_TYPES :
563+ elif settings .type in self .__class__ .SQL_TYPES :
454564 result = self .test_and_detect ()
455565
456566 if not "schemaDetection" in result :
@@ -460,7 +570,7 @@ def autodetect_settings(self, infer_storage_types=False):
460570 return settings
461571
462572 else :
463- raise ValueError ("don't know how to test/detect on dataset type:%s" % settings .get_type () )
573+ raise ValueError ("don't know how to test/detect on dataset type:%s" % settings .type )
464574
465575 def get_as_core_dataset (self ):
466576 import dataiku
@@ -471,10 +581,11 @@ def get_as_core_dataset(self):
471581 ########################################################
472582
473583 def new_code_recipe (self , type , code = None , recipe_name = None ):
474- """Starts creation of a new code recipe taking this dataset as input"""
475-
476- if recipe_name is None :
477- recipe_name = "%s_recipe_from_%s" % (type , self .dataset_name )
584+ """
585+ Starts creation of a new code recipe taking this dataset as input
586+ :param str type: Type of the recipe ('python', 'r', 'pyspark', 'sparkr', 'sql', 'sparksql', 'hive', ...)
587+ :param str code: The code of the recipe
588+ """
478589
479590 if type == "python" :
480591 builder = recipe .PythonRecipeCreator (recipe_name , self .project )
@@ -485,12 +596,15 @@ def new_code_recipe(self, type, code=None, recipe_name=None):
485596 builder .with_script (code )
486597 return builder
487598
488- def new_grouping_recipe (self , first_group_by , recipe_name = None ):
489- if recipe_name is None :
490- recipe_name = "group_%s" % (self .dataset_name )
491- builder = recipe .GroupingRecipeCreator (recipe_name , self .project )
599+ def new_recipe (self , type , recipe_name = None ):
600+ """
601+ Starts creation of a new recipe taking this dataset as input.
602+ For more details, please see :meth:`dataikuapi.dss.project.DSSProject.new_recipe`
603+
604+ :param str type: Type of the recipe
605+ """
606+ builder = self .project .new_recipe (type = type , name = recipe_name )
492607 builder .with_input (self .dataset_name )
493- builder .with_group_key (first_group_by )
494608 return builder
495609
496610 ########################################################
@@ -527,9 +641,15 @@ def __init__(self, dataset, settings):
527641 self .settings = settings
528642
529643 def get_raw (self ):
644+ """Get the raw dataset settings as a dict"""
530645 return self .settings
531646
532- def get_type (self ):
647+ def get_raw_params (self ):
648+ """Get the type-specific params, as a raw dict"""
649+ return self .settings ["params" ]
650+
651+ @property
652+ def type (self ):
533653 return self .settings ["type" ]
534654
535655 def remove_partitioning (self ):
@@ -541,6 +661,9 @@ def add_discrete_partitioning_dimension(self, dim_name):
541661 def add_time_partitioning_dimension (self , dim_name , period = "DAY" ):
542662 self .settings ["partitioning" ]["dimensions" ].append ({"name" : dim_name , "type" : "time" , "params" :{"period" : period }})
543663
664+ def add_raw_schema_column (self , column ):
665+ self .settings ["schema" ]["columns" ].append (column )
666+
544667 def save (self ):
545668 self .dataset .client ._perform_empty (
546669 "PUT" , "/projects/%s/datasets/%s" % (self .dataset .project_key , self .dataset .dataset_name ),
@@ -550,13 +673,21 @@ class FSLikeDatasetSettings(DSSDatasetSettings):
550673 def __init__ (self , dataset , settings ):
551674 super (FSLikeDatasetSettings , self ).__init__ (dataset , settings )
552675
553- def set_format (format_type , format_params = None ):
676+ def set_connection_and_path (self , connection , path ):
677+ self .settings ["params" ]["connection" ] = connection
678+ self .settings ["params" ]["path" ] = path
679+
680+ def get_raw_format_params (self ):
681+ """Get the raw format parameters as a dict"""
682+ return self .settings ["formatParams" ]
683+
684+ def set_format (self , format_type , format_params = None ):
554685 if format_params is None :
555686 format_params = {}
556687 self .settings ["formatType" ] = format_type
557688 self .settings ["formatParams" ] = format_params
558689
559- def set_csv_format (separator = "," , style = "excel" , skip_rows_before = 0 , header_row = True , skip_rows_after = 0 ):
690+ def set_csv_format (self , separator = "," , style = "excel" , skip_rows_before = 0 , header_row = True , skip_rows_after = 0 ):
560691 format_params = {
561692 "style" : style ,
562693 "separator" : separator ,
@@ -573,7 +704,14 @@ class SQLDatasetSettings(DSSDatasetSettings):
573704 def __init__ (self , dataset , settings ):
574705 super (SQLDatasetSettings , self ).__init__ (dataset , settings )
575706
576-
707+ def set_table (self , connection , schema , table ):
708+ """Sets this SQL dataset in 'table' mode, targeting a particular table of a connection"""
709+ self .settings ["params" ].update ({
710+ "connection" : connection ,
711+ "mode" : "table" ,
712+ "schema" : schema ,
713+ "table" : table
714+ })
577715
578716class DSSManagedDatasetCreationHelper (object ):
579717
@@ -611,26 +749,27 @@ def with_copy_partitioning_from(self, dataset_ref, object_type='DATASET'):
611749 self .creation_settings ["partitioningOptionId" ] = "copy:%s:%s" % (code , dataset_ref )
612750 return self
613751
614- def create (self ):
752+ def create (self , overwrite = False ):
615753 """
616754 Executes the creation of the managed dataset according to the selected options
617-
755+ :param overwrite: If the dataset being created already exists, delete it first (removing data)
618756 :return: The :class:`DSSDataset` corresponding to the newly created dataset
619757 """
758+ if overwrite and self .already_exists ():
759+ self .project .get_dataset (self .dataset_name ).delete (drop_data = True )
760+
620761 self .project .client ._perform_json ("POST" , "/projects/%s/datasets/managed" % self .project .project_key ,
621762 body = {
622763 "name" : self .dataset_name ,
623764 "creationSettings" : self .creation_settings
624765 })
625766 return DSSDataset (self .project .client , self .project .project_key , self .dataset_name )
626767
627- def dataset_exists (self , dataset_name = None ):
628- dataset_name = self . dataset_name if not dataset_name else dataset_name
629- dataset = self .project .get_dataset (dataset_name )
768+ def already_exists (self ):
769+ """Returns whether this managed dataset already exists"""
770+ dataset = self .project .get_dataset (self . dataset_name )
630771 try :
631772 dataset .get_metadata ()
773+ return True
632774 except Exception as e :
633- #TODO: may include some logging here and more generally in the API client
634775 return False
635-
636- return True
0 commit comments