@@ -969,12 +969,77 @@ def with_new_output_dataset(self, name, connection,
969969 self .with_output (name , append = append )
970970 return self
971971
972-
973-
974972 def _finish_creation_settings (self ):
975973 super (CodeRecipeCreator , self )._finish_creation_settings ()
976974 self .creation_settings ['script' ] = self .script
977975
976+ class PythonRecipeCreator (CodeRecipeCreator ):
977+ """
978+ Creates a Python recipe.
979+ A Python recipe can be defined either by its complete code, like a normal Python recipe, or
980+ by a function signature.
981+
982+ When using a function, the function must take as arguments:
983+ * A list of dataframes corresponding to the dataframes of the input datasets
984+ * Optional named arguments corresponding to arguments passed to the creator
985+ """
986+
987+ def __init__ (self , name , project ):
988+ DSSRecipeCreator .__init__ (self , "python" , name , project )
989+
990+ DEFAULT_RECIPE_CODE_TMPL = """
991+ # This code is autogenerated by PythonRecipeCreator function mode
992+ import dataiku, dataiku.recipe, json
993+ from {module_name} import {fname}
994+ input_datasets = dataiku.recipe.get_inputs_as_datasets()
995+ output_datasets = dataiku.recipe.get_outputs_as_datasets()
996+ params = json.loads('{params_json}')
997+
998+ logging.info("Reading %d input datasets as dataframes" % len(input_datasets))
999+ input_dataframes = [ds.get_dataframe() for ds in input_datasets]
1000+
1001+ logging.info("Calling user function {fname}")
1002+ function_input = input_dataframes if len(input_dataframes) > 1 else input_dataframes[0]
1003+ output_dataframes = {fname}(function_input, **params)
1004+
1005+ if not isinstance(output_dataframes, list):
1006+ output_dataframes = [output_dataframes]
1007+
1008+ if not len(output_dataframes) == len(output_datasets):
1009+ raise Exception("Code function {fname}() returned %d dataframes but recipe expects %d output datasets", \\
1010+ (len(output_dataframes), len(output_datasets)))
1011+ output = list(zip(output_datasets, output_dataframes))
1012+ for ds, df in output:
1013+ logging.info("Writing function result to dataset %s" % ds.name)
1014+ ds.write_with_schema(df)
1015+ """
1016+
1017+ def with_function_name (self , module_name , function_name , function_args = None , custom_template = None ):
1018+ """
1019+ Defines this recipe as being a functional recipe calling a function name from a module name
1020+ """
1021+ script_tmpl = PythonRecipeCreator .DEFAULT_RECIPE_CODE_TMPL if custom_template is None else custom_template
1022+
1023+ if function_args is None :
1024+ function_args = {}
1025+
1026+ code = script_tmpl .format (module_name = module_name , fname = function_name , params_json = json .dumps (function_args ))
1027+ self .with_script (code )
1028+
1029+ return self
1030+
1031+ def with_function (self , fn , function_args = None , custom_template = None ):
1032+ #TODO: add in documentation that relative imports wont work
1033+ module_name = inspect .getmodule (fn ).__name__
1034+ fname = fn .__name__
1035+
1036+ # Validate that function_args apply to fn
1037+ argspec = inspect .getargspec (fn )
1038+ for k in function_args .keys ():
1039+ if k not in argspec .args :
1040+ raise ValueError ("Provided key argument {} not an argument of function {}" .format (k , fname ))
1041+
1042+ return with_function_name (module_name , fname , function_args , custom_template )
9781043
9791044class SQLQueryRecipeCreator (SingleOutputRecipeCreator ):
9801045 """
0 commit comments