|
6 | 6 | importlib_util = util.import_importlib('importlib.util') |
7 | 7 |
|
8 | 8 | import importlib.util |
9 | | -from importlib import _bootstrap_external |
10 | 9 | import os |
11 | 10 | import pathlib |
12 | 11 | import string |
@@ -788,74 +787,5 @@ def test_complete_multi_phase_init_module(self): |
788 | 787 | self.run_with_own_gil(script) |
789 | 788 |
|
790 | 789 |
|
791 | | -class PatchAtomicWrites(): |
792 | | - def __init__(self, truncate_at_length=100, never_complete=False): |
793 | | - self.truncate_at_length = truncate_at_length |
794 | | - self.never_complete = never_complete |
795 | | - self.seen_write = False |
796 | | - self._children = [] |
797 | | - |
798 | | - def __enter__(self): |
799 | | - import _pyio |
800 | | - |
801 | | - oldwrite = os.write |
802 | | - |
803 | | - # Emulate an os.write that only writes partial data. |
804 | | - def write(fd, data): |
805 | | - if self.seen_write and self.never_complete: |
806 | | - return None |
807 | | - self.seen_write = True |
808 | | - return oldwrite(fd, data[:self.truncate_at_length]) |
809 | | - |
810 | | - # Need to patch _io to be _pyio, so that io.FileIO is affected by the |
811 | | - # os.write patch. |
812 | | - self.children = [ |
813 | | - support.swap_attr(_bootstrap_external, '_io', _pyio), |
814 | | - support.swap_attr(os, 'write', write) |
815 | | - ] |
816 | | - for child in self.children: |
817 | | - child.__enter__() |
818 | | - return self |
819 | | - |
820 | | - def __exit__(self, exc_type, exc_val, exc_tb): |
821 | | - for child in self.children: |
822 | | - child.__exit__(exc_type, exc_val, exc_tb) |
823 | | - |
824 | | - |
825 | | -class MiscTests(unittest.TestCase): |
826 | | - |
827 | | - def test_atomic_write_retries_incomplete_writes(self): |
828 | | - truncate_at_length = 100 |
829 | | - length = truncate_at_length * 2 |
830 | | - |
831 | | - with PatchAtomicWrites(truncate_at_length=truncate_at_length) as cm: |
832 | | - # Make sure we write something longer than the point where we |
833 | | - # truncate. |
834 | | - content = b'x' * length |
835 | | - _bootstrap_external._write_atomic(os_helper.TESTFN, content) |
836 | | - assert cm.seen_write |
837 | | - |
838 | | - assert os.stat(support.os_helper.TESTFN).st_size == length |
839 | | - os.unlink(support.os_helper.TESTFN) |
840 | | - |
841 | | - def test_atomic_write_errors_if_unable_to_complete(self): |
842 | | - truncate_at_length = 100 |
843 | | - |
844 | | - with ( |
845 | | - PatchAtomicWrites( |
846 | | - truncate_at_length=truncate_at_length, never_complete=True, |
847 | | - ) as cm, |
848 | | - self.assertRaises(OSError) |
849 | | - ): |
850 | | - # Make sure we write something longer than the point where we |
851 | | - # truncate. |
852 | | - content = b'x' * (truncate_at_length * 2) |
853 | | - _bootstrap_external._write_atomic(os_helper.TESTFN, content) |
854 | | - assert cm.seen_write |
855 | | - |
856 | | - with self.assertRaises(OSError): |
857 | | - os.stat(support.os_helper.TESTFN) # Check that the file did not get written. |
858 | | - |
859 | | - |
860 | 790 | if __name__ == '__main__': |
861 | 791 | unittest.main() |
0 commit comments