From 94eb9946a8b0e227122ca87ad2ae2dfb785fc38c Mon Sep 17 00:00:00 2001 From: Thierry RAMORASOAVINA Date: Mon, 12 Jan 2026 17:05:46 +0100 Subject: [PATCH] Fix temporary directory race condition in sklearn estimators Caution : for the tests, MPI must be disabled otherwise a segfault would occur because of a huge use of resources --- CHANGELOG.md | 1 + khiops/sklearn/estimators.py | 21 ------- tests/test_sklearn.py | 113 +++++++++++++++++++++++++++++++++++ 3 files changed, 114 insertions(+), 21 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5b401add..55f35dcc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ ### Fixed - (General) Automatic Credentials Discovery-based credential retrieval for Google cloud storage (GCS). +- (`sklearn`) Temporary directory race condition in estimators. ## 11.0.0.0 - 2025-12-19 diff --git a/khiops/sklearn/estimators.py b/khiops/sklearn/estimators.py index 7e99c95e..426531fb 100644 --- a/khiops/sklearn/estimators.py +++ b/khiops/sklearn/estimators.py @@ -362,8 +362,6 @@ def fit(self, X, y=None, **kwargs): # Create temporary directory and tables computation_dir = self._create_computation_dir("fit") - initial_runner_temp_dir = kh.get_runner().root_temp_dir - kh.get_runner().root_temp_dir = computation_dir # Create the dataset, fit the model and reset in case of any failure try: @@ -377,7 +375,6 @@ def fit(self, X, y=None, **kwargs): # Cleanup and restore the runner's temporary dir finally: self._cleanup_computation_dir(computation_dir) - kh.get_runner().root_temp_dir = initial_runner_temp_dir # If on "fitted" state then: # - self.model_ must be a DictionaryDomain @@ -963,7 +960,6 @@ def _simplify( computation_dir = self._create_computation_dir("simplify") output_dir = self._get_output_dir(computation_dir) simplify_log_file_path = fs.get_child_path(output_dir, "khiops_simplify_cc.log") - initial_runner_temp_dir = kh.get_runner().root_temp_dir full_coclustering_file_path = fs.get_child_path( output_dir, "FullCoclustering.khcj" ) @@ -971,7 +967,6 @@ def _simplify( output_dir, "Coclustering.khcj" ) self.model_report_.write_khiops_json_file(full_coclustering_file_path) - kh.get_runner().root_temp_dir = computation_dir try: # - simplify_coclustering, then # - prepare_coclustering_deployment @@ -1040,7 +1035,6 @@ def _simplify( ) finally: self._cleanup_computation_dir(computation_dir) - kh.get_runner().root_temp_dir = initial_runner_temp_dir return simplified_cc def simplify( @@ -1101,8 +1095,6 @@ def predict(self, X): """ # Create temporary directory computation_dir = self._create_computation_dir("predict") - initial_runner_temp_dir = kh.get_runner().root_temp_dir - kh.get_runner().root_temp_dir = computation_dir # Create the input dataset ds = Dataset(X) @@ -1119,7 +1111,6 @@ def predict(self, X): # Cleanup and restore the runner's temporary dir finally: self._cleanup_computation_dir(computation_dir) - kh.get_runner().root_temp_dir = initial_runner_temp_dir # Transform to numpy.array y_pred = y_pred.to_numpy() @@ -1557,8 +1548,6 @@ def predict(self, X): """ # Create temporary directory computation_dir = self._create_computation_dir("predict") - initial_runner_temp_dir = kh.get_runner().root_temp_dir - kh.get_runner().root_temp_dir = computation_dir try: # Create the input dataset @@ -1575,10 +1564,6 @@ def predict(self, X): # Cleanup and restore the runner's temporary dir finally: self._cleanup_computation_dir(computation_dir) - kh.get_runner().root_temp_dir = initial_runner_temp_dir - - # Restore the runner's temporary dir - kh.get_runner().root_temp_dir = initial_runner_temp_dir # Return pd.Series in the monotable + pandas case assert isinstance(y_pred, (str, pd.DataFrame)), "Expected str or DataFrame" @@ -1997,8 +1982,6 @@ def predict_proba(self, X): """ # Create temporary directory and tables computation_dir = self._create_computation_dir("predict_proba") - initial_runner_temp_dir = kh.get_runner().root_temp_dir - kh.get_runner().root_temp_dir = computation_dir # Create the input dataset @@ -2015,7 +1998,6 @@ def predict_proba(self, X): # Cleanup and restore the runner's temporary dir finally: self._cleanup_computation_dir(computation_dir) - kh.get_runner().root_temp_dir = initial_runner_temp_dir # - Reorder the columns to that of self.classes_ # - Transform to np.ndarray @@ -2649,8 +2631,6 @@ def transform(self, X): """ # Create temporary directory computation_dir = self._create_computation_dir("transform") - initial_runner_temp_dir = kh.get_runner().root_temp_dir - kh.get_runner().root_temp_dir = computation_dir # Create and transform the dataset try: @@ -2665,7 +2645,6 @@ def transform(self, X): # Cleanup and restore the runner's temporary dir finally: self._cleanup_computation_dir(computation_dir) - kh.get_runner().root_temp_dir = initial_runner_temp_dir return X_transformed.to_numpy(copy=False) def _transform_prepare_deployment_for_transform(self, ds): diff --git a/tests/test_sklearn.py b/tests/test_sklearn.py index 0bf3c8de..1c515652 100644 --- a/tests/test_sklearn.py +++ b/tests/test_sklearn.py @@ -6,18 +6,24 @@ ###################################################################################### """Tests parameter transfer between Khiops sklearn and core APIs""" import contextlib +import io import os import shutil import unittest import warnings +from concurrent.futures import as_completed +from concurrent.futures.thread import ThreadPoolExecutor from itertools import product import numpy as np +import pandas as pd from sklearn.exceptions import NotFittedError from sklearn.utils.estimator_checks import check_estimator from sklearn.utils.validation import check_is_fitted import khiops.core as kh +import khiops.core.internals.filesystems as fs +from khiops.core.internals.runner import KhiopsLocalRunner from khiops.sklearn.estimators import ( KhiopsClassifier, KhiopsCoclustering, @@ -1773,6 +1779,27 @@ def test_sklearn_check_estimator(self): print("Done") +def no_mpi(func): + """Disable MPI (in setting proc number to 1) to save resources""" + + def inner(self): + # set up a single cpu runner + initial_runner = kh.get_runner() + initial_proc_number = os.environ["KHIOPS_PROC_NUMBER"] + os.environ["KHIOPS_PROC_NUMBER"] = "1" + single_cpu_runner = KhiopsLocalRunner() + kh.set_runner(single_cpu_runner) + + # call the initial test function + func(self) + + # restore the runner + os.environ["KHIOPS_PROC_NUMBER"] = initial_proc_number + kh.set_runner(initial_runner) + + return inner + + class KhiopsSklearnVariousTests(unittest.TestCase): """Miscelanous sklearn classes tests""" @@ -1827,3 +1854,89 @@ def test_export_operations_raise_when_not_fitted(self): with self.subTest(export_operation=export_operation, estimator=estimator): with self.assertRaises(NotFittedError): getattr(estimator, export_operation)("report.khj") + + @no_mpi + def test_concurrency_safe_operations(self): + """Ensure no race condition occurs when running concurrent operations""" + + # Define all the function calls that will be submitted to the threads + def predict_func(clf, X): + return clf.predict(X) + + def predict_proba_func(clf, X): + return clf.predict_proba(X) + + def encoder_fit_transform_func(khe, X, y): + return khe.fit_transform(X, y) + + def estimator_fit_func(khcc, X, id_column): + return khcc.fit(X, id_column=id_column) + + def coclustering_simplify_func(khcc): + return khcc.simplify() + + def coclustering_predict_func(khcc, X): + return khcc.predict(X) + + clf = KhiopsClassifier(n_trees=0) + adult_df = pd.read_csv( + f"{kh.get_samples_dir()}/Adult/Adult.txt", sep="\t", header=0 + ) + X = adult_df.drop("class", axis=1) + clf.fit(X, adult_df["class"]) + + # Test `predict`, `predict_proba` of `KhiopsPredictor` and its children + # (`KhiopsClassifier` and `KhiopsRegressor`) + with ThreadPoolExecutor(max_workers=5) as executor: + futures = {executor.submit(predict_func, clf, X): i for i in range(5)} + for future in as_completed(futures): + print(future.result()) + futures = {executor.submit(predict_proba_func, clf, X): i for i in range(5)} + for future in as_completed(futures): + print(future.result()) + + # Test `transform` of `KhiopsEncoder` + khe = KhiopsEncoder() + + y = adult_df["class"] + with ThreadPoolExecutor(max_workers=5) as executor: + futures = { + executor.submit(encoder_fit_transform_func, khe, X, y): i + for i in range(5) + } + for future in as_completed(futures): + print(future.result()) + + # Test `fit`, `simplify` and `predict` of + # `KhiopsCoclustering` and `KhiopsEstimator` + splice_data_dir = fs.get_child_path( + kh.get_runner().samples_dir, "SpliceJunction" + ) + splice_data_file_path = fs.get_child_path( + splice_data_dir, "SpliceJunctionDNA.txt" + ) + + # Read the splice junction secondary datatable + with io.BytesIO(fs.read(splice_data_file_path)) as splice_data_file: + splice_df = pd.read_csv(splice_data_file, sep="\t") + + khcc = KhiopsCoclustering() + + with ThreadPoolExecutor(max_workers=5) as executor: + futures = { + executor.submit(estimator_fit_func, khcc, splice_df, "SampleId"): i + for i in range(5) + } + for future in as_completed(futures): + print(future.result()) + futures = { + executor.submit(coclustering_simplify_func, khcc): i for i in range(5) + } + for future in as_completed(futures): + print(future.result()) + futures = { + executor.submit(coclustering_predict_func, khcc, splice_df): i + for i in range(5) + } + for future in as_completed(futures): + print(future.result())