From b796221d604c6a811b8feaf88e27466125923f93 Mon Sep 17 00:00:00 2001 From: Meir Komet Date: Sat, 3 Feb 2024 13:14:12 +0200 Subject: [PATCH] gh-114953: Support for multithreaded XZ compression in lzma.py Signed-off-by: Meir Komet --- Lib/lzma.py | 18 +++-- Lib/test/test_lzma.py | 20 +++++ ...-02-03-13-22-59.gh-issue-114953.o6HA1w.rst | 6 ++ Modules/_lzmamodule.c | 73 ++++++++++++++++--- 4 files changed, 100 insertions(+), 17 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2024-02-03-13-22-59.gh-issue-114953.o6HA1w.rst diff --git a/Lib/lzma.py b/Lib/lzma.py index 800f52198fbb79..249a94430d0136 100644 --- a/Lib/lzma.py +++ b/Lib/lzma.py @@ -47,7 +47,8 @@ class LZMAFile(_compression.BaseStream): """ def __init__(self, filename=None, mode="r", *, - format=None, check=-1, preset=None, filters=None): + format=None, check=-1, preset=None, filters=None, + threads=1): """Open an LZMA-compressed file in binary mode. filename can be either an actual file name (given as a str, @@ -72,7 +73,8 @@ def __init__(self, filename=None, mode="r", *, When opening a file for reading, the *preset* argument is not meaningful, and should be omitted. The *filters* argument should also be omitted, except when format is FORMAT_RAW (in which case - it is required). + it is required). The *threads* argument is only useful when writing + using FORMAT_XZ. When opening a file for writing, the settings used by the compressor can be specified either as a preset compression @@ -89,6 +91,11 @@ def __init__(self, filename=None, mode="r", *, filters (if provided) should be a sequence of dicts. Each dict should have an entry for "id" indicating ID of the filter, plus additional entries for options to the filter. + + threads (if provided) should be a non-negative integer indicating how + many background threads to create for the compressor (only when using + FORMAT_XZ, otherwise the compression will be single-threaded). + If 0, the number of threads is set to the number of CPU cores. """ self._fp = None self._closefp = False @@ -109,7 +116,8 @@ def __init__(self, filename=None, mode="r", *, format = FORMAT_XZ mode_code = _MODE_WRITE self._compressor = LZMACompressor(format=format, check=check, - preset=preset, filters=filters) + preset=preset, filters=filters, + threads=threads) self._pos = 0 else: raise ValueError("Invalid mode: {!r}".format(mode)) @@ -316,7 +324,7 @@ def open(filename, mode="rb", *, return binary_file -def compress(data, format=FORMAT_XZ, check=-1, preset=None, filters=None): +def compress(data, format=FORMAT_XZ, check=-1, preset=None, filters=None, threads=1): """Compress a block of data. Refer to LZMACompressor's docstring for a description of the @@ -324,7 +332,7 @@ def compress(data, format=FORMAT_XZ, check=-1, preset=None, filters=None): For incremental compression, use an LZMACompressor instead. """ - comp = LZMACompressor(format, check, preset, filters) + comp = LZMACompressor(format, check, preset, filters, threads) return comp.compress(data) + comp.flush() diff --git a/Lib/test/test_lzma.py b/Lib/test/test_lzma.py index 65e6488c5d7b10..7cefe07c1fdd54 100644 --- a/Lib/test/test_lzma.py +++ b/Lib/test/test_lzma.py @@ -282,6 +282,18 @@ def test_roundtrip_xz(self): lzd = LZMADecompressor() self._test_decompressor(lzd, cdata, lzma.CHECK_CRC64) + def test_roundtrip_xz_mt(self): + lzc = LZMACompressor(threads=0) + cdata = lzc.compress(INPUT) + lzc.flush() + lzd = LZMADecompressor() + self._test_decompressor(lzd, cdata, lzma.CHECK_CRC64) + + def test_roundtrip_xz_mt_preset_6(self): + lzc = LZMACompressor(preset=6, threads=8) + cdata = lzc.compress(INPUT) + lzc.flush() + lzd = LZMADecompressor() + self._test_decompressor(lzd, cdata, lzma.CHECK_CRC64) + def test_roundtrip_alone(self): lzc = LZMACompressor(lzma.FORMAT_ALONE) cdata = lzc.compress(INPUT) + lzc.flush() @@ -540,6 +552,8 @@ class FileTestCase(unittest.TestCase): def test_init(self): with LZMAFile(BytesIO(COMPRESSED_XZ)) as f: pass + with LZMAFile(BytesIO(COMPRESSED_XZ), threads=4) as f: + pass with LZMAFile(BytesIO(), "w") as f: pass with LZMAFile(BytesIO(), "x") as f: @@ -657,6 +671,8 @@ def test_init_bad_filter_spec(self): LZMAFile(BytesIO(), "w", filters=[b"wobsite"]) with self.assertRaises(ValueError): LZMAFile(BytesIO(), "w", filters=[{"xyzzy": 3}]) + with self.assertRaises(ValueError): + LZMAFile(BytesIO(), "w", filters=[{"xyzzy": 3}], threads=4) with self.assertRaises(ValueError): LZMAFile(BytesIO(), "w", filters=[{"id": 98765}]) with self.assertRaises(ValueError): @@ -669,6 +685,10 @@ def test_init_bad_filter_spec(self): LZMAFile(BytesIO(), "w", filters=[{"id": lzma.FILTER_X86, "foo": 0}]) + def test_init_bad_threads(self): + with self.assertRaises(ValueError): + LZMAFile(BytesIO(), "w", threads=-1) + def test_init_with_preset_and_filters(self): with self.assertRaises(ValueError): LZMAFile(BytesIO(), "w", format=lzma.FORMAT_RAW, diff --git a/Misc/NEWS.d/next/Library/2024-02-03-13-22-59.gh-issue-114953.o6HA1w.rst b/Misc/NEWS.d/next/Library/2024-02-03-13-22-59.gh-issue-114953.o6HA1w.rst new file mode 100644 index 00000000000000..997ff923e1166e --- /dev/null +++ b/Misc/NEWS.d/next/Library/2024-02-03-13-22-59.gh-issue-114953.o6HA1w.rst @@ -0,0 +1,6 @@ +Added support for multi-threaded XZ compression by passing in new optional +parameter ``threads`` into :mod:`lzma` API. Supported both in the +:func:`lzma.compress()` function and :class:`lzma.LZMAFile` class. + +This is supported by using the underlying ``lzma_stream_encoder_mt()`` API +provided by liblzma. diff --git a/Modules/_lzmamodule.c b/Modules/_lzmamodule.c index f6bfbfa62687b8..6fc8d292602de7 100644 --- a/Modules/_lzmamodule.c +++ b/Modules/_lzmamodule.c @@ -650,20 +650,63 @@ _lzma_LZMACompressor_flush_impl(Compressor *self) static int Compressor_init_xz(_lzma_state *state, lzma_stream *lzs, - int check, uint32_t preset, PyObject *filterspecs) + int check, uint32_t preset, PyObject *filterspecs, + int threads) { lzma_ret lzret; - if (filterspecs == Py_None) { - lzret = lzma_easy_encoder(lzs, preset, check); - } else { - lzma_filter filters[LZMA_FILTERS_MAX + 1]; + if (threads == 1) { + if (filterspecs == Py_None) { + lzret = lzma_easy_encoder(lzs, preset, check); + } else { + lzma_filter filters[LZMA_FILTERS_MAX + 1]; - if (parse_filter_chain_spec(state, filters, filterspecs) == -1) + if (parse_filter_chain_spec(state, filters, filterspecs) == -1) { + return -1; + } + + lzret = lzma_stream_encoder(lzs, filters, check); + free_filter_chain(filters); + } + } else { + if (threads < 0) { + PyErr_SetString(PyExc_ValueError, + "threads must be a non-negative integer"); return -1; - lzret = lzma_stream_encoder(lzs, filters, check); - free_filter_chain(filters); + } + + if (threads == 0) { + threads = lzma_cputhreads(); + } + if (threads == 0) { + threads = 1; + } + + lzma_mt mt = { + .flags = 0, + .block_size = 0, + .timeout = 0, + .preset = preset, + .check = check, + .filters = NULL, + .threads = threads, + }; + + if (filterspecs == Py_None) { + lzret = lzma_stream_encoder_mt(lzs, &mt); + } else { + lzma_filter filters[LZMA_FILTERS_MAX + 1]; + + if (parse_filter_chain_spec(state, filters, filterspecs) == -1) { + return -1; + } + + mt.filters = filters; + lzret = lzma_stream_encoder_mt(lzs, &mt); + free_filter_chain(filters); + } } + if (catch_lzma_error(state, lzret)) { return -1; } @@ -755,6 +798,9 @@ _lzma.LZMACompressor.__new__ have an entry for "id" indicating the ID of the filter, plus additional entries for options to the filter. + threads: int(c_default="1") = 1 + Number of threads to use for compression (only relevant with FORMAT_XZ). + Create a compressor object for compressing data incrementally. The settings used by the compressor can be specified either as a @@ -769,9 +815,11 @@ For one-shot compression, use the compress() function instead. static PyObject * Compressor_new(PyTypeObject *type, PyObject *args, PyObject *kwargs) { - static char *arg_names[] = {"format", "check", "preset", "filters", NULL}; + static char *arg_names[] = {"format", "check", "preset", "filters", + "threads", NULL}; int format = FORMAT_XZ; int check = -1; + int threads = 1; uint32_t preset = LZMA_PRESET_DEFAULT; PyObject *preset_obj = Py_None; PyObject *filterspecs = Py_None; @@ -780,9 +828,9 @@ Compressor_new(PyTypeObject *type, PyObject *args, PyObject *kwargs) _lzma_state *state = PyType_GetModuleState(type); assert(state != NULL); if (!PyArg_ParseTupleAndKeywords(args, kwargs, - "|iiOO:LZMACompressor", arg_names, + "|iiOOi:LZMACompressor", arg_names, &format, &check, &preset_obj, - &filterspecs)) { + &filterspecs, &threads)) { return NULL; } @@ -826,7 +874,8 @@ Compressor_new(PyTypeObject *type, PyObject *args, PyObject *kwargs) if (check == -1) { check = LZMA_CHECK_CRC64; } - if (Compressor_init_xz(state, &self->lzs, check, preset, filterspecs) != 0) { + if (Compressor_init_xz(state, &self->lzs, check, preset, + filterspecs, threads) != 0) { goto error; } break;