diff --git a/qtapputils/managers/fileio.py b/qtapputils/managers/fileio.py
index 83acfa9..6e17dab 100644
--- a/qtapputils/managers/fileio.py
+++ b/qtapputils/managers/fileio.py
@@ -11,7 +11,9 @@
from typing import Callable
# ---- Standard imports
+import os
import os.path as osp
+import uuid
# ---- Third party imports
from qtpy.QtCore import QObject
@@ -20,7 +22,7 @@
class SaveFileManager(QObject):
def __init__(self, namefilters: dict, onsave: Callable,
- parent: QWidget = None):
+ parent: QWidget = None, atomic: bool = False):
"""
A manager to save files.
@@ -28,8 +30,7 @@ def __init__(self, namefilters: dict, onsave: Callable,
----------
namefilters : dict
A dictionary containing the file filters to use in the
- 'Save As' file dialog. Here is an example of a correctly
- formated namefilters dictionary:
+ 'Save As' file dialog. For example:
namefilters = {
'.pdf': 'Portable Document Format (*.pdf)',
@@ -39,43 +40,143 @@ def __init__(self, namefilters: dict, onsave: Callable,
}
Note that the first entry in the dictionary will be used as the
- default name filter to use in the 'Save As' dialog.
+ default name filter in the 'Save As' dialog.
onsave : Callable
- The callable that is used to save the file.
+ The callable that is used to save the file. This should be a
+ function that takes the output filename as its first argument,
+ and writes the file contents to disk.
parent: QWidget, optional
The parent widget to use for the 'Save As' file dialog.
+ atomic: bool, optional
+ Whether to save files atomically (write to a temp file then move).
+ Defaults to False for backward compatibility. For better data
+ integrity, consider setting atomic=True.
"""
super().__init__()
self.parent = parent
self.namefilters = namefilters
self.onsave = onsave
+ self.atomic = atomic
+
+ def _get_valid_tempname(self, filename):
+ destdir = osp.dirname(filename)
+ while True:
+ tempname = osp.join(
+ destdir,
+ f'.temp_{str(uuid.uuid4())[:8]}_'
+ f'{osp.basename(filename)}'
+ )
+ if not osp.exists(tempname):
+ return tempname
+
+ def _get_new_save_filename(self, filename):
+ root, ext = osp.splitext(filename)
+ if ext not in self.namefilters:
+ ext = next(iter(self.namefilters))
+ filename += ext
+
+ filename, filefilter = QFileDialog.getSaveFileName(
+ self.parent,
+ "Save As",
+ filename,
+ ';;'.join(self.namefilters.values()),
+ self.namefilters[ext])
+
+ if filename:
+ # Make sure the filename has the right extension.
+ ext = dict(map(reversed, self.namefilters.items()))[filefilter]
+ if not filename.endswith(ext):
+ filename += ext
+
+ return filename
+ # ---- Public methods
def save_file(self, filename: str, *args, **kwargs) -> str:
"""
- Save in provided filename.
+ ave file to the provided filename, with atomic write option.
Parameters
----------
filename : str
- The abosulte path where to save the file.
+ The absolute path where to save the file.
Returns
-------
filename : str
- The absolute path where the file was successfully saved. Returns
- 'None' if the saving operation was cancelled or was unsuccessfull.
+ The absolute path where the file was successfully saved.
+ Returns None if save was cancelled or unsuccessful.
"""
- try:
- self.onsave(filename, *args, **kwargs)
- except PermissionError:
+ def _show_warning(message: str):
QMessageBox.warning(
- self.parent,
- 'File in Use',
- ("The save file operation cannot be completed because the "
- "file is in use by another application or user."),
- QMessageBox.Ok)
- filename = self.save_file_as(filename, *args, **kwargs)
- return filename
+ self.parent, 'Save Error', message, QMessageBox.Ok
+ )
+
+ def _show_critical(error: Exception):
+ msg = (f'An unexpected error occurred while saving the file:'
+ f'
'
+ f'{type(error).__name__}: '
+ f'{error}')
+ QMessageBox.critical(
+ self.parent, 'Save Error', msg, QMessageBox.Ok
+ )
+
+ write_permission_msg = (
+ "You do not have write permission for this location.\n\n"
+ "Please choose a different location and try again."
+ )
+ overwrite_error_msg = (
+ "The save operation could not be completed because:\n\n"
+ "- You do not have write permission for the selected location"
+ ", or\n"
+ "- The file is currently in use by another application.\n\n"
+ "Please choose a different location or ensure the file is not "
+ "open in another program and try again."
+ )
+
+ while True:
+ file_exists = osp.exists(filename)
+ tempname = None
+
+ try:
+ if self.atomic:
+ tempname = self._get_valid_tempname(filename)
+ self.onsave(tempname, *args, **kwargs)
+ try:
+ os.replace(tempname, filename)
+ return filename
+ except PermissionError:
+ if file_exists:
+ _show_warning(overwrite_error_msg)
+ else:
+ _show_warning(write_permission_msg)
+
+ filename = self._get_new_save_filename(filename)
+ if not filename:
+ return None
+ else:
+ self.onsave(filename, *args, **kwargs)
+ return filename
+
+ except PermissionError:
+ if self.atomic or not file_exists:
+ _show_warning(write_permission_msg)
+ else:
+ _show_warning(overwrite_error_msg)
+
+ filename = self._get_new_save_filename(filename)
+ if not filename:
+ return None
+
+ except Exception as error:
+ _show_critical(error)
+ return None
+
+ finally:
+ if self.atomic and osp.exists(tempname):
+ try:
+ os.remove(tempname)
+ except Exception:
+ pass
def save_file_as(self, filename: str, *args, **kwargs) -> str:
"""
@@ -90,23 +191,8 @@ def save_file_as(self, filename: str, *args, **kwargs) -> str:
-------
filename : str
The absolute path where the file was successfully saved. Returns
- 'None' if the saving operation was cancelled or was unsuccessfull.
+ 'None' if the saving operation was cancelled or was unsuccessful.
"""
- root, ext = osp.splitext(filename)
- if ext not in self.namefilters:
- ext = next(iter(self.namefilters))
- filename += ext
-
- filename, filefilter = QFileDialog.getSaveFileName(
- self.parent,
- "Save As",
- filename,
- ';;'.join(self.namefilters.values()),
- self.namefilters[ext])
+ filename = self._get_new_save_filename(filename)
if filename:
- # Make sur the filename has the right extension.
- ext = dict(map(reversed, self.namefilters.items()))[filefilter]
- if not filename.endswith(ext):
- filename += ext
-
return self.save_file(filename, *args, **kwargs)
diff --git a/qtapputils/managers/tests/test_fileio.py b/qtapputils/managers/tests/test_fileio.py
index ee6dc30..990711d 100644
--- a/qtapputils/managers/tests/test_fileio.py
+++ b/qtapputils/managers/tests/test_fileio.py
@@ -12,11 +12,14 @@
"""
# ---- Standard imports
+import os
import os.path as osp
+import stat
+from unittest.mock import MagicMock, patch
# ---- Third party imports
import pytest
-from qtpy.QtWidgets import QFileDialog, QMessageBox
+from qtpy.QtWidgets import QFileDialog, QMessageBox, QWidget
# ---- Local imports
from qtapputils.managers import SaveFileManager
@@ -27,96 +30,219 @@
# =============================================================================
# ---- Fixtures
# =============================================================================
-@pytest.fixture
-def savefile_manager(qtbot):
- def onsave(filename, filecontent):
- if 'blocked' in filename:
- raise PermissionError
+# Dummy onsave function for success
+def dummy_onsave_success(filename, *args, **kwargs):
+ with open(filename, "w") as f:
+ f.write("data")
- with open(filename, 'w') as f:
- f.write(filecontent)
- manager = SaveFileManager(
- namefilters={
- '.txt': 'Text File (*.txt)',
- '.docx': 'Microsoft Word Document (*.docx)'
- },
- onsave=onsave
- )
- return manager
+# Dummy onsave function for permission error
+def dummy_onsave_permission_error(filename, *args, **kwargs):
+ raise PermissionError("No write permission")
+
+
+# Dummy onsave function for generic error
+def dummy_onsave_generic_error(filename, *args, **kwargs):
+ raise RuntimeError("Unexpected error")
+
+
+NAMEFILTERS = {
+ ".txt": "Text Files (*.txt)",
+ ".csv": "CSV Files (*.csv)"
+ }
+
+
+@pytest.fixture
+def parent(qtbot):
+ parent = QWidget()
+ qtbot.addWidget(parent)
+ return parent
# =============================================================================
# ---- Tests
# =============================================================================
-def test_save_file(savefile_manager, qtbot, tmp_path):
- """
- Test that saving a file is working as expected.
- """
- filename = osp.join(tmp_path, 'test_savefile.txt')
- assert not osp.exists(filename)
+@pytest.mark.parametrize('atomic', [True, False])
+def test_save_file_success(tmp_path, atomic, parent):
+ """Test successful file save in atomic and non-atomic modes."""
+ manager = SaveFileManager(
+ namefilters=NAMEFILTERS,
+ onsave=dummy_onsave_success,
+ parent=parent,
+ atomic=atomic
+ )
+
+ filename = osp.join(tmp_path, "output.txt")
+ result = manager.save_file(filename)
+ assert result == filename
+ assert osp.exists(filename)
+ with open(filename) as f:
+ assert f.read() == "data"
- returned_filename = savefile_manager.save_file(filename, FILECONTENT)
+@pytest.mark.parametrize('atomic', [True, False])
+def test_save_file_as_success(tmp_path, mocker, atomic, parent):
+ """Test successful file save using the 'Save As' dialog."""
+ manager = SaveFileManager(
+ namefilters=NAMEFILTERS,
+ onsave=dummy_onsave_success,
+ parent=parent,
+ atomic=atomic
+ )
+
+ filename = osp.join(tmp_path, "newfile.csv")
+ qfdialog_patcher = mocker.patch.object(
+ QFileDialog,
+ 'getSaveFileName',
+ return_value=(filename, "CSV Files (*.csv)")
+ )
+
+ suggested = osp.join(tmp_path, "suggested.txt")
+ result = manager.save_file_as(suggested)
+
+ assert qfdialog_patcher.call_count == 1
+ assert result == filename
assert osp.exists(filename)
- assert filename == returned_filename
- with open(filename, 'r') as f:
- assert FILECONTENT == f.read()
+ assert not osp.exists(suggested)
+
+
+@pytest.mark.parametrize('atomic', [True, False])
+def test_save_file_permission_error(tmp_path, mocker, atomic, parent):
+ """Test handling of PermissionError during save with user cancel."""
+ manager = SaveFileManager(
+ namefilters=NAMEFILTERS,
+ onsave=dummy_onsave_permission_error,
+ parent=parent,
+ atomic=atomic
+ )
+ filename = osp.join(tmp_path, "fail.txt")
-def test_save_file_error(savefile_manager, qtbot, tmp_path, mocker):
- """
- Test that selecting a new file when an error is raised is
- working as expected.
- """
+ # Mock QMessageBox so no GUI appears.
qmsgbox_patcher = mocker.patch.object(
QMessageBox, 'warning', return_value=QMessageBox.Ok
)
- filename = osp.join(tmp_path, 'test_savefile')
- assert not osp.exists(filename)
-
+ # Mock file dialog to simulate cancel.
qfdialog_patcher = mocker.patch.object(
QFileDialog,
'getSaveFileName',
- return_value=(filename, 'Text File (*.txt)')
+ return_value=(None, None)
)
- returned_filename = savefile_manager.save_file(
- filename + '_blocked', FILECONTENT)
+ result = manager.save_file(str(filename))
+ assert result is None
assert qfdialog_patcher.call_count == 1
assert qmsgbox_patcher.call_count == 1
- assert returned_filename == filename + '.txt'
- assert osp.exists(filename + '.txt')
+ assert not osp.exists(filename)
-def test_save_file_error_cancelled(savefile_manager, qtbot, tmp_path, mocker):
- """
- Test that cancelling the saving process when an error is raised is
- working as expected.
- """
+@pytest.mark.parametrize('atomic', [True, False])
+def test_save_file_generic_exception(tmp_path, mocker, atomic, parent):
+ """Test handling of generic exception during save."""
+ manager = SaveFileManager(
+ namefilters=NAMEFILTERS,
+ onsave=dummy_onsave_generic_error,
+ parent=parent,
+ atomic=atomic
+ )
+
+ filename = osp.join(tmp_path, "fail.txt")
+
+ # Mock QMessageBox so no GUI appears.
qmsgbox_patcher = mocker.patch.object(
- QMessageBox, 'warning', return_value=QMessageBox.Ok
+ QMessageBox, 'critical', return_value=QMessageBox.Ok
)
- # Test that cancelling the saving process when an error is raised is
- # working as expected.
+ result = manager.save_file(str(filename))
+ assert result is None
+ assert not osp.exists(filename)
+ assert qmsgbox_patcher.call_count == 1
+
+
+@pytest.mark.parametrize('atomic', [True, False])
+def test_extension_added_if_missing(tmp_path, mocker, atomic, parent):
+ """Test automatic extension addition when missing in file name."""
+ manager = SaveFileManager(
+ namefilters=NAMEFILTERS,
+ onsave=dummy_onsave_success,
+ parent=parent,
+ atomic=atomic
+ )
+
+ # Simulate user picking CSV filter, but filename without extension.
+ filename = osp.join(tmp_path, "nofile")
+
+ qfdialog_patcher = mocker.patch.object(
+ QFileDialog,
+ 'getSaveFileName',
+ return_value=(filename, "CSV Files (*.csv)")
+ )
+
+ result = manager.save_file_as(filename)
+
+ assert result == filename + '.csv'
+ assert osp.exists(filename + '.csv')
+ assert qfdialog_patcher.call_count == 1
+
+
+@pytest.mark.parametrize('atomic', [True, False])
+def test_save_file_as_cancel(tmp_path, mocker, atomic, parent):
+ """Test 'Save As' dialog cancel returns None and does not save."""
+ manager = SaveFileManager(
+ namefilters=NAMEFILTERS,
+ onsave=dummy_onsave_success,
+ parent=parent,
+ atomic=atomic
+ )
- filename = osp.join(tmp_path, 'test_savefile_blocked.txt')
+ qfdialog_patcher = mocker.patch.object(
+ QFileDialog,
+ 'getSaveFileName',
+ return_value=("", "")
+ )
+
+ filename = osp.join(tmp_path, "suggested.txt")
+ result = manager.save_file_as(filename)
+
+ assert result is None
+ assert qfdialog_patcher.call_count == 1
assert not osp.exists(filename)
+
+def test_atomic_save_replace_permission_error(tmp_path, mocker, parent):
+ """Test atomic save when os.replace raises PermissionError."""
+ manager = SaveFileManager(
+ namefilters=NAMEFILTERS,
+ onsave=dummy_onsave_success,
+ parent=parent,
+ atomic=True
+ )
+ filename = osp.join(tmp_path, "file.txt")
+
+ # Patch os.replace to raise PermissionError.
+ mocker.patch(
+ "os.replace", side_effect=PermissionError("Permission denied")
+ )
+
+ # Patch QMessageBox.warning so no GUI appears
+ qmsgbox_patcher = mocker.patch.object(
+ QMessageBox, 'warning', return_value=QMessageBox.Ok
+ )
+
+ # Patch the file dialog to simulate cancel.
qfdialog_patcher = mocker.patch.object(
QFileDialog,
'getSaveFileName',
- return_value=(None, None)
+ return_value=("", "")
)
- returned_filename = savefile_manager.save_file(filename, FILECONTENT)
+ result = manager.save_file(filename)
+
+ assert result is None
assert qfdialog_patcher.call_count == 1
assert qmsgbox_patcher.call_count == 1
- assert returned_filename is None
- assert not osp.exists(filename)
if __name__ == "__main__":