1- import time
1+ import time , warnings , sys , os . path as osp
22from .dataset import DSSDataset , DSSManagedDatasetCreationHelper
33from .recipe import DSSRecipe
44from .managedfolder import DSSManagedFolder
55from .savedmodel import DSSSavedModel
66from .job import DSSJob , DSSJobWaiter
77from .scenario import DSSScenario
88from .apiservice import DSSAPIService
9- import sys
10- import os .path as osp
119from .future import DSSFuture
1210from .notebook import DSSNotebook
1311from .macro import DSSMacro
@@ -281,18 +279,36 @@ def create_upload_dataset(self, dataset_name, connection=None):
281279 return DSSDataset (self .client , self .project_key , dataset_name )
282280
283281 def create_filesystem_dataset (self , dataset_name , connection , path_in_connection ):
284- params = {}
285- obj = {
282+ return self .create_fslike_dataset (dataset_name , "Filesystem" , connection , path_in_connection )
283+
284+ def create_s3_dataset (self , dataset_name , connection , path_in_connection , bucket = None ):
285+ """
286+ Creates a new external S3 dataset in the project and returns a :class:`~dataikuapi.dss.dataset.DSSDataset` to interact with it.
287+
288+ The created dataset doesn not have its format and schema initialized, it is recommend to use
289+ :meth:`~dataikuapi.dss.dataset.DSSDataset.autodetect_settings` on the returned object
290+
291+ :param dataset_name: Name of the dataset to create. Must not already exist
292+ :rtype: `~dataikuapi.dss.dataset.DSSDataset`
293+ """
294+ extra_params = {}
295+ if bucket is not None :
296+ extra_params ["bucket" ] = bucket
297+ return self .create_fslike_dataset (dataset_name , "S3" , connection , path_in_connection , extra_params )
298+
299+ def create_fslike_dataset (self , dataset_name , dataset_type , connection , path_in_connection , extra_params = None ):
300+ body = {
286301 "name" : dataset_name ,
287302 "projectKey" : self .project_key ,
288- "type" : "Filesystem" ,
303+ "type" : dataset_type ,
289304 "params" : {
290305 "connection" : connection ,
291306 "path" : path_in_connection
292307 }
293308 }
294- self .client ._perform_json ("POST" , "/projects/%s/datasets/" % self .project_key ,
295- body = obj )
309+ if extra_params is not None :
310+ body ["params" ].update (extra_params )
311+ self .client ._perform_json ("POST" , "/projects/%s/datasets/" % self .project_key , body = body )
296312 return DSSDataset (self .client , self .project_key , dataset_name )
297313
298314 def create_sql_table_dataset (self , dataset_name , type , connection , table , schema ):
@@ -566,7 +582,7 @@ def start_job(self, definition):
566582
567583 def start_job_and_wait (self , definition , no_fail = False ):
568584 """
569- Create a new job. Wait the end of the job to complete.
585+ Starts a new job and waits for it to complete.
570586
571587 Args:
572588 definition: the definition for the job to create. The definition must contain the type of job (RECURSIVE_BUILD,
@@ -579,8 +595,27 @@ def start_job_and_wait(self, definition, no_fail=False):
579595 waiter = DSSJobWaiter (job )
580596 return waiter .wait (no_fail )
581597
598+ def new_job (self , job_type = 'NON_RECURSIVE_FORCED_BUILD' ):
599+ """
600+ Create a job to be run
601+
602+ You need to add outputs to the job (i.e. what you want to build) before running it.
603+
604+ .. code-block:: python
605+
606+ job_builder = project.new_job()
607+ job_builder.with_output("mydataset")
608+ complete_job = job_builder.start_and_wait()
609+ print("Job %s done" % complete_job.id)
610+
611+ :rtype: :class:`JobDefinitionBuilder`
612+ """
613+ return JobDefinitionBuilder (self , job_type )
614+
582615 def new_job_definition_builder (self , job_type = 'NON_RECURSIVE_FORCED_BUILD' ):
583- return JobDefinitionBuilder (self .project_key , job_type )
616+ """Deprecated. Please use :meth:`new_job`"""
617+ warnings .warn ("new_job_definition_builder is deprecated, please use new_job" , DeprecationWarning )
618+ return JobDefinitionBuilder (self , job_type )
584619
585620 ########################################################
586621 # Variables
@@ -1185,16 +1220,11 @@ def save(self):
11851220 body = self .settings )
11861221
11871222class JobDefinitionBuilder (object ):
1188- def __init__ (self , project_key , job_type = "NON_RECURSIVE_FORCED_BUILD" ):
1189- """
1190- Create a helper to build a job definition
1191-
1192- :param project_key: the project in which the job is launched
1193- :param job_type: the build type for the job RECURSIVE_BUILD, NON_RECURSIVE_FORCED_BUILD,
1194- RECURSIVE_FORCED_BUILD, RECURSIVE_MISSING_ONLY_BUILD
1195-
1196- """
1197- self .project_key = project_key
1223+ """
1224+ Helper to run a job. Do not create this class directly, use :meth:`DSSProject.new_job`
1225+ """
1226+ def __init__ (self , project , job_type = "NON_RECURSIVE_FORCED_BUILD" ):
1227+ self .project = project
11981228 self .definition = {'type' :job_type , 'refreshHiveMetastore' :False , 'outputs' :[]}
11991229
12001230 def with_type (self , job_type ):
@@ -1217,10 +1247,39 @@ def with_refresh_metastore(self, refresh_metastore):
12171247
12181248 def with_output (self , name , object_type = None , object_project_key = None , partition = None ):
12191249 """
1220- Adds an item to build in the definition
1250+ Adds an item to build in this job
12211251 """
12221252 self .definition ['outputs' ].append ({'type' :object_type , 'id' :name , 'projectKey' :object_project_key , 'partition' :partition })
12231253 return self
12241254
12251255 def get_definition (self ):
1256+ """Gets the internal definition for this job"""
12261257 return self .definition
1258+
1259+ def start (self ):
1260+ """
1261+ Starts the job, and return a :doc:`dataikuapi.dss.job.DSSJob` handle to interact with it.
1262+
1263+ You need to wait for the returned job to complete
1264+
1265+ :return: the :class:`dataikuapi.dss.job.DSSJob` job handle
1266+ :rtype: :class:`dataikuapi.dss.job.DSSJob`
1267+ """
1268+ job_def = self .project .client ._perform_json ("POST" ,
1269+ "/projects/%s/jobs/" % self .project .project_key , body = self .definition )
1270+ return DSSJob (self .project .client , self .project .project_key , job_def ['id' ])
1271+
1272+ def start_and_wait (self , no_fail = False ):
1273+ """
1274+ Starts the job, waits for it to complete and returns a a :doc:`dataikuapi.dss.job.DSSJob` handle to interact with it
1275+
1276+ Raises if the job failed.
1277+
1278+ :param no_fail: if True, does not raise if the job failed.
1279+ :return: the :class:`dataikuapi.dss.job.DSSJob` job handle
1280+ :rtype: :class:`dataikuapi.dss.job.DSSJob`
1281+ """
1282+ job = self .start ()
1283+ waiter = DSSJobWaiter (job )
1284+ waiter .wait (no_fail )
1285+ return job
0 commit comments