diff --git a/qtapputils/managers/fileio.py b/qtapputils/managers/fileio.py index 83acfa9..6e17dab 100644 --- a/qtapputils/managers/fileio.py +++ b/qtapputils/managers/fileio.py @@ -11,7 +11,9 @@ from typing import Callable # ---- Standard imports +import os import os.path as osp +import uuid # ---- Third party imports from qtpy.QtCore import QObject @@ -20,7 +22,7 @@ class SaveFileManager(QObject): def __init__(self, namefilters: dict, onsave: Callable, - parent: QWidget = None): + parent: QWidget = None, atomic: bool = False): """ A manager to save files. @@ -28,8 +30,7 @@ def __init__(self, namefilters: dict, onsave: Callable, ---------- namefilters : dict A dictionary containing the file filters to use in the - 'Save As' file dialog. Here is an example of a correctly - formated namefilters dictionary: + 'Save As' file dialog. For example: namefilters = { '.pdf': 'Portable Document Format (*.pdf)', @@ -39,43 +40,143 @@ def __init__(self, namefilters: dict, onsave: Callable, } Note that the first entry in the dictionary will be used as the - default name filter to use in the 'Save As' dialog. + default name filter in the 'Save As' dialog. onsave : Callable - The callable that is used to save the file. + The callable that is used to save the file. This should be a + function that takes the output filename as its first argument, + and writes the file contents to disk. parent: QWidget, optional The parent widget to use for the 'Save As' file dialog. + atomic: bool, optional + Whether to save files atomically (write to a temp file then move). + Defaults to False for backward compatibility. For better data + integrity, consider setting atomic=True. """ super().__init__() self.parent = parent self.namefilters = namefilters self.onsave = onsave + self.atomic = atomic + + def _get_valid_tempname(self, filename): + destdir = osp.dirname(filename) + while True: + tempname = osp.join( + destdir, + f'.temp_{str(uuid.uuid4())[:8]}_' + f'{osp.basename(filename)}' + ) + if not osp.exists(tempname): + return tempname + + def _get_new_save_filename(self, filename): + root, ext = osp.splitext(filename) + if ext not in self.namefilters: + ext = next(iter(self.namefilters)) + filename += ext + + filename, filefilter = QFileDialog.getSaveFileName( + self.parent, + "Save As", + filename, + ';;'.join(self.namefilters.values()), + self.namefilters[ext]) + + if filename: + # Make sure the filename has the right extension. + ext = dict(map(reversed, self.namefilters.items()))[filefilter] + if not filename.endswith(ext): + filename += ext + + return filename + # ---- Public methods def save_file(self, filename: str, *args, **kwargs) -> str: """ - Save in provided filename. + ave file to the provided filename, with atomic write option. Parameters ---------- filename : str - The abosulte path where to save the file. + The absolute path where to save the file. Returns ------- filename : str - The absolute path where the file was successfully saved. Returns - 'None' if the saving operation was cancelled or was unsuccessfull. + The absolute path where the file was successfully saved. + Returns None if save was cancelled or unsuccessful. """ - try: - self.onsave(filename, *args, **kwargs) - except PermissionError: + def _show_warning(message: str): QMessageBox.warning( - self.parent, - 'File in Use', - ("The save file operation cannot be completed because the " - "file is in use by another application or user."), - QMessageBox.Ok) - filename = self.save_file_as(filename, *args, **kwargs) - return filename + self.parent, 'Save Error', message, QMessageBox.Ok + ) + + def _show_critical(error: Exception): + msg = (f'An unexpected error occurred while saving the file:' + f'

' + f'{type(error).__name__}: ' + f'{error}') + QMessageBox.critical( + self.parent, 'Save Error', msg, QMessageBox.Ok + ) + + write_permission_msg = ( + "You do not have write permission for this location.\n\n" + "Please choose a different location and try again." + ) + overwrite_error_msg = ( + "The save operation could not be completed because:\n\n" + "- You do not have write permission for the selected location" + ", or\n" + "- The file is currently in use by another application.\n\n" + "Please choose a different location or ensure the file is not " + "open in another program and try again." + ) + + while True: + file_exists = osp.exists(filename) + tempname = None + + try: + if self.atomic: + tempname = self._get_valid_tempname(filename) + self.onsave(tempname, *args, **kwargs) + try: + os.replace(tempname, filename) + return filename + except PermissionError: + if file_exists: + _show_warning(overwrite_error_msg) + else: + _show_warning(write_permission_msg) + + filename = self._get_new_save_filename(filename) + if not filename: + return None + else: + self.onsave(filename, *args, **kwargs) + return filename + + except PermissionError: + if self.atomic or not file_exists: + _show_warning(write_permission_msg) + else: + _show_warning(overwrite_error_msg) + + filename = self._get_new_save_filename(filename) + if not filename: + return None + + except Exception as error: + _show_critical(error) + return None + + finally: + if self.atomic and osp.exists(tempname): + try: + os.remove(tempname) + except Exception: + pass def save_file_as(self, filename: str, *args, **kwargs) -> str: """ @@ -90,23 +191,8 @@ def save_file_as(self, filename: str, *args, **kwargs) -> str: ------- filename : str The absolute path where the file was successfully saved. Returns - 'None' if the saving operation was cancelled or was unsuccessfull. + 'None' if the saving operation was cancelled or was unsuccessful. """ - root, ext = osp.splitext(filename) - if ext not in self.namefilters: - ext = next(iter(self.namefilters)) - filename += ext - - filename, filefilter = QFileDialog.getSaveFileName( - self.parent, - "Save As", - filename, - ';;'.join(self.namefilters.values()), - self.namefilters[ext]) + filename = self._get_new_save_filename(filename) if filename: - # Make sur the filename has the right extension. - ext = dict(map(reversed, self.namefilters.items()))[filefilter] - if not filename.endswith(ext): - filename += ext - return self.save_file(filename, *args, **kwargs) diff --git a/qtapputils/managers/tests/test_fileio.py b/qtapputils/managers/tests/test_fileio.py index ee6dc30..990711d 100644 --- a/qtapputils/managers/tests/test_fileio.py +++ b/qtapputils/managers/tests/test_fileio.py @@ -12,11 +12,14 @@ """ # ---- Standard imports +import os import os.path as osp +import stat +from unittest.mock import MagicMock, patch # ---- Third party imports import pytest -from qtpy.QtWidgets import QFileDialog, QMessageBox +from qtpy.QtWidgets import QFileDialog, QMessageBox, QWidget # ---- Local imports from qtapputils.managers import SaveFileManager @@ -27,96 +30,219 @@ # ============================================================================= # ---- Fixtures # ============================================================================= -@pytest.fixture -def savefile_manager(qtbot): - def onsave(filename, filecontent): - if 'blocked' in filename: - raise PermissionError +# Dummy onsave function for success +def dummy_onsave_success(filename, *args, **kwargs): + with open(filename, "w") as f: + f.write("data") - with open(filename, 'w') as f: - f.write(filecontent) - manager = SaveFileManager( - namefilters={ - '.txt': 'Text File (*.txt)', - '.docx': 'Microsoft Word Document (*.docx)' - }, - onsave=onsave - ) - return manager +# Dummy onsave function for permission error +def dummy_onsave_permission_error(filename, *args, **kwargs): + raise PermissionError("No write permission") + + +# Dummy onsave function for generic error +def dummy_onsave_generic_error(filename, *args, **kwargs): + raise RuntimeError("Unexpected error") + + +NAMEFILTERS = { + ".txt": "Text Files (*.txt)", + ".csv": "CSV Files (*.csv)" + } + + +@pytest.fixture +def parent(qtbot): + parent = QWidget() + qtbot.addWidget(parent) + return parent # ============================================================================= # ---- Tests # ============================================================================= -def test_save_file(savefile_manager, qtbot, tmp_path): - """ - Test that saving a file is working as expected. - """ - filename = osp.join(tmp_path, 'test_savefile.txt') - assert not osp.exists(filename) +@pytest.mark.parametrize('atomic', [True, False]) +def test_save_file_success(tmp_path, atomic, parent): + """Test successful file save in atomic and non-atomic modes.""" + manager = SaveFileManager( + namefilters=NAMEFILTERS, + onsave=dummy_onsave_success, + parent=parent, + atomic=atomic + ) + + filename = osp.join(tmp_path, "output.txt") + result = manager.save_file(filename) + assert result == filename + assert osp.exists(filename) + with open(filename) as f: + assert f.read() == "data" - returned_filename = savefile_manager.save_file(filename, FILECONTENT) +@pytest.mark.parametrize('atomic', [True, False]) +def test_save_file_as_success(tmp_path, mocker, atomic, parent): + """Test successful file save using the 'Save As' dialog.""" + manager = SaveFileManager( + namefilters=NAMEFILTERS, + onsave=dummy_onsave_success, + parent=parent, + atomic=atomic + ) + + filename = osp.join(tmp_path, "newfile.csv") + qfdialog_patcher = mocker.patch.object( + QFileDialog, + 'getSaveFileName', + return_value=(filename, "CSV Files (*.csv)") + ) + + suggested = osp.join(tmp_path, "suggested.txt") + result = manager.save_file_as(suggested) + + assert qfdialog_patcher.call_count == 1 + assert result == filename assert osp.exists(filename) - assert filename == returned_filename - with open(filename, 'r') as f: - assert FILECONTENT == f.read() + assert not osp.exists(suggested) + + +@pytest.mark.parametrize('atomic', [True, False]) +def test_save_file_permission_error(tmp_path, mocker, atomic, parent): + """Test handling of PermissionError during save with user cancel.""" + manager = SaveFileManager( + namefilters=NAMEFILTERS, + onsave=dummy_onsave_permission_error, + parent=parent, + atomic=atomic + ) + filename = osp.join(tmp_path, "fail.txt") -def test_save_file_error(savefile_manager, qtbot, tmp_path, mocker): - """ - Test that selecting a new file when an error is raised is - working as expected. - """ + # Mock QMessageBox so no GUI appears. qmsgbox_patcher = mocker.patch.object( QMessageBox, 'warning', return_value=QMessageBox.Ok ) - filename = osp.join(tmp_path, 'test_savefile') - assert not osp.exists(filename) - + # Mock file dialog to simulate cancel. qfdialog_patcher = mocker.patch.object( QFileDialog, 'getSaveFileName', - return_value=(filename, 'Text File (*.txt)') + return_value=(None, None) ) - returned_filename = savefile_manager.save_file( - filename + '_blocked', FILECONTENT) + result = manager.save_file(str(filename)) + assert result is None assert qfdialog_patcher.call_count == 1 assert qmsgbox_patcher.call_count == 1 - assert returned_filename == filename + '.txt' - assert osp.exists(filename + '.txt') + assert not osp.exists(filename) -def test_save_file_error_cancelled(savefile_manager, qtbot, tmp_path, mocker): - """ - Test that cancelling the saving process when an error is raised is - working as expected. - """ +@pytest.mark.parametrize('atomic', [True, False]) +def test_save_file_generic_exception(tmp_path, mocker, atomic, parent): + """Test handling of generic exception during save.""" + manager = SaveFileManager( + namefilters=NAMEFILTERS, + onsave=dummy_onsave_generic_error, + parent=parent, + atomic=atomic + ) + + filename = osp.join(tmp_path, "fail.txt") + + # Mock QMessageBox so no GUI appears. qmsgbox_patcher = mocker.patch.object( - QMessageBox, 'warning', return_value=QMessageBox.Ok + QMessageBox, 'critical', return_value=QMessageBox.Ok ) - # Test that cancelling the saving process when an error is raised is - # working as expected. + result = manager.save_file(str(filename)) + assert result is None + assert not osp.exists(filename) + assert qmsgbox_patcher.call_count == 1 + + +@pytest.mark.parametrize('atomic', [True, False]) +def test_extension_added_if_missing(tmp_path, mocker, atomic, parent): + """Test automatic extension addition when missing in file name.""" + manager = SaveFileManager( + namefilters=NAMEFILTERS, + onsave=dummy_onsave_success, + parent=parent, + atomic=atomic + ) + + # Simulate user picking CSV filter, but filename without extension. + filename = osp.join(tmp_path, "nofile") + + qfdialog_patcher = mocker.patch.object( + QFileDialog, + 'getSaveFileName', + return_value=(filename, "CSV Files (*.csv)") + ) + + result = manager.save_file_as(filename) + + assert result == filename + '.csv' + assert osp.exists(filename + '.csv') + assert qfdialog_patcher.call_count == 1 + + +@pytest.mark.parametrize('atomic', [True, False]) +def test_save_file_as_cancel(tmp_path, mocker, atomic, parent): + """Test 'Save As' dialog cancel returns None and does not save.""" + manager = SaveFileManager( + namefilters=NAMEFILTERS, + onsave=dummy_onsave_success, + parent=parent, + atomic=atomic + ) - filename = osp.join(tmp_path, 'test_savefile_blocked.txt') + qfdialog_patcher = mocker.patch.object( + QFileDialog, + 'getSaveFileName', + return_value=("", "") + ) + + filename = osp.join(tmp_path, "suggested.txt") + result = manager.save_file_as(filename) + + assert result is None + assert qfdialog_patcher.call_count == 1 assert not osp.exists(filename) + +def test_atomic_save_replace_permission_error(tmp_path, mocker, parent): + """Test atomic save when os.replace raises PermissionError.""" + manager = SaveFileManager( + namefilters=NAMEFILTERS, + onsave=dummy_onsave_success, + parent=parent, + atomic=True + ) + filename = osp.join(tmp_path, "file.txt") + + # Patch os.replace to raise PermissionError. + mocker.patch( + "os.replace", side_effect=PermissionError("Permission denied") + ) + + # Patch QMessageBox.warning so no GUI appears + qmsgbox_patcher = mocker.patch.object( + QMessageBox, 'warning', return_value=QMessageBox.Ok + ) + + # Patch the file dialog to simulate cancel. qfdialog_patcher = mocker.patch.object( QFileDialog, 'getSaveFileName', - return_value=(None, None) + return_value=("", "") ) - returned_filename = savefile_manager.save_file(filename, FILECONTENT) + result = manager.save_file(filename) + + assert result is None assert qfdialog_patcher.call_count == 1 assert qmsgbox_patcher.call_count == 1 - assert returned_filename is None - assert not osp.exists(filename) if __name__ == "__main__":