Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions Lib/lzma.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ class LZMAFile(_compression.BaseStream):
"""

def __init__(self, filename=None, mode="r", *,
format=None, check=-1, preset=None, filters=None):
format=None, check=-1, preset=None, filters=None,
threads=1):
"""Open an LZMA-compressed file in binary mode.

filename can be either an actual file name (given as a str,
Expand All @@ -72,7 +73,8 @@ def __init__(self, filename=None, mode="r", *,
When opening a file for reading, the *preset* argument is not
meaningful, and should be omitted. The *filters* argument should
also be omitted, except when format is FORMAT_RAW (in which case
it is required).
it is required). The *threads* argument is only useful when writing
using FORMAT_XZ.

When opening a file for writing, the settings used by the
compressor can be specified either as a preset compression
Expand All @@ -89,6 +91,11 @@ def __init__(self, filename=None, mode="r", *,
filters (if provided) should be a sequence of dicts. Each dict
should have an entry for "id" indicating ID of the filter, plus
additional entries for options to the filter.

threads (if provided) should be a non-negative integer indicating how
many background threads to create for the compressor (only when using
FORMAT_XZ, otherwise the compression will be single-threaded).
If 0, the number of threads is set to the number of CPU cores.
"""
self._fp = None
self._closefp = False
Expand All @@ -109,7 +116,8 @@ def __init__(self, filename=None, mode="r", *,
format = FORMAT_XZ
mode_code = _MODE_WRITE
self._compressor = LZMACompressor(format=format, check=check,
preset=preset, filters=filters)
preset=preset, filters=filters,
threads=threads)
self._pos = 0
else:
raise ValueError("Invalid mode: {!r}".format(mode))
Expand Down Expand Up @@ -316,15 +324,15 @@ def open(filename, mode="rb", *,
return binary_file


def compress(data, format=FORMAT_XZ, check=-1, preset=None, filters=None):
def compress(data, format=FORMAT_XZ, check=-1, preset=None, filters=None, threads=1):
"""Compress a block of data.

Refer to LZMACompressor's docstring for a description of the
optional arguments *format*, *check*, *preset* and *filters*.

For incremental compression, use an LZMACompressor instead.
"""
comp = LZMACompressor(format, check, preset, filters)
comp = LZMACompressor(format, check, preset, filters, threads)
return comp.compress(data) + comp.flush()


Expand Down
20 changes: 20 additions & 0 deletions Lib/test/test_lzma.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,18 @@ def test_roundtrip_xz(self):
lzd = LZMADecompressor()
self._test_decompressor(lzd, cdata, lzma.CHECK_CRC64)

def test_roundtrip_xz_mt(self):
lzc = LZMACompressor(threads=0)
cdata = lzc.compress(INPUT) + lzc.flush()
lzd = LZMADecompressor()
self._test_decompressor(lzd, cdata, lzma.CHECK_CRC64)

def test_roundtrip_xz_mt_preset_6(self):
lzc = LZMACompressor(preset=6, threads=8)
cdata = lzc.compress(INPUT) + lzc.flush()
lzd = LZMADecompressor()
self._test_decompressor(lzd, cdata, lzma.CHECK_CRC64)

def test_roundtrip_alone(self):
lzc = LZMACompressor(lzma.FORMAT_ALONE)
cdata = lzc.compress(INPUT) + lzc.flush()
Expand Down Expand Up @@ -540,6 +552,8 @@ class FileTestCase(unittest.TestCase):
def test_init(self):
with LZMAFile(BytesIO(COMPRESSED_XZ)) as f:
pass
with LZMAFile(BytesIO(COMPRESSED_XZ), threads=4) as f:
pass
with LZMAFile(BytesIO(), "w") as f:
pass
with LZMAFile(BytesIO(), "x") as f:
Expand Down Expand Up @@ -657,6 +671,8 @@ def test_init_bad_filter_spec(self):
LZMAFile(BytesIO(), "w", filters=[b"wobsite"])
with self.assertRaises(ValueError):
LZMAFile(BytesIO(), "w", filters=[{"xyzzy": 3}])
with self.assertRaises(ValueError):
LZMAFile(BytesIO(), "w", filters=[{"xyzzy": 3}], threads=4)
with self.assertRaises(ValueError):
LZMAFile(BytesIO(), "w", filters=[{"id": 98765}])
with self.assertRaises(ValueError):
Expand All @@ -669,6 +685,10 @@ def test_init_bad_filter_spec(self):
LZMAFile(BytesIO(), "w",
filters=[{"id": lzma.FILTER_X86, "foo": 0}])

def test_init_bad_threads(self):
with self.assertRaises(ValueError):
LZMAFile(BytesIO(), "w", threads=-1)

def test_init_with_preset_and_filters(self):
with self.assertRaises(ValueError):
LZMAFile(BytesIO(), "w", format=lzma.FORMAT_RAW,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Added support for multi-threaded XZ compression by passing in new optional
parameter ``threads`` into :mod:`lzma` API. Supported both in the
:func:`lzma.compress()` function and :class:`lzma.LZMAFile` class.

This is supported by using the underlying ``lzma_stream_encoder_mt()`` API
provided by liblzma.
73 changes: 61 additions & 12 deletions Modules/_lzmamodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -650,20 +650,63 @@ _lzma_LZMACompressor_flush_impl(Compressor *self)

static int
Compressor_init_xz(_lzma_state *state, lzma_stream *lzs,
int check, uint32_t preset, PyObject *filterspecs)
int check, uint32_t preset, PyObject *filterspecs,
int threads)
{
lzma_ret lzret;

if (filterspecs == Py_None) {
lzret = lzma_easy_encoder(lzs, preset, check);
} else {
lzma_filter filters[LZMA_FILTERS_MAX + 1];
if (threads == 1) {
if (filterspecs == Py_None) {
lzret = lzma_easy_encoder(lzs, preset, check);
} else {
lzma_filter filters[LZMA_FILTERS_MAX + 1];

if (parse_filter_chain_spec(state, filters, filterspecs) == -1)
if (parse_filter_chain_spec(state, filters, filterspecs) == -1) {
return -1;
}

lzret = lzma_stream_encoder(lzs, filters, check);
free_filter_chain(filters);
}
} else {
if (threads < 0) {
PyErr_SetString(PyExc_ValueError,
"threads must be a non-negative integer");
return -1;
lzret = lzma_stream_encoder(lzs, filters, check);
free_filter_chain(filters);
}

if (threads == 0) {
threads = lzma_cputhreads();
}
if (threads == 0) {
threads = 1;
}

lzma_mt mt = {
.flags = 0,
.block_size = 0,
.timeout = 0,
.preset = preset,
.check = check,
.filters = NULL,
.threads = threads,
};

if (filterspecs == Py_None) {
lzret = lzma_stream_encoder_mt(lzs, &mt);
} else {
lzma_filter filters[LZMA_FILTERS_MAX + 1];

if (parse_filter_chain_spec(state, filters, filterspecs) == -1) {
return -1;
}

mt.filters = filters;
lzret = lzma_stream_encoder_mt(lzs, &mt);
free_filter_chain(filters);
}
}

if (catch_lzma_error(state, lzret)) {
return -1;
}
Expand Down Expand Up @@ -755,6 +798,9 @@ _lzma.LZMACompressor.__new__
have an entry for "id" indicating the ID of the filter, plus
additional entries for options to the filter.

threads: int(c_default="1") = 1
Number of threads to use for compression (only relevant with FORMAT_XZ).
Comment on lines +801 to +802
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to keep the Python API unchanged, and implicitly use the cores available?

Copy link

@Artoria2e5 Artoria2e5 Feb 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. This would usually be an improvement. There is a little bit of risk in the way of someone running multiple instances in parallel.

(Also, is it possible to do a chunked compress (which allows parallel decomp) with just one thread? What happens when you tell encoder_mt to do 1 thread? Do we want to expose that potential difference?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, yes, it's possible to use the chunked/parallel compressor with one thread. xz 5.6.0 will default to this for its command line interface at least, because it enables decompression in parallel for the archives.

Copy link

@Artoria2e5 Artoria2e5 Feb 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case it may be beneficial to default to 1 thread with chunks. If you still want to have a "use the non-chunked thing" option, maybe we can pass some silly value like -1 threads to Compressor_init_xz.

Something along the lines of Artoria2e5@40029fb. (Treat as a long comment, not as a serious commit.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


Create a compressor object for compressing data incrementally.

The settings used by the compressor can be specified either as a
Expand All @@ -769,9 +815,11 @@ For one-shot compression, use the compress() function instead.
static PyObject *
Compressor_new(PyTypeObject *type, PyObject *args, PyObject *kwargs)
{
static char *arg_names[] = {"format", "check", "preset", "filters", NULL};
static char *arg_names[] = {"format", "check", "preset", "filters",
"threads", NULL};
int format = FORMAT_XZ;
int check = -1;
int threads = 1;
uint32_t preset = LZMA_PRESET_DEFAULT;
PyObject *preset_obj = Py_None;
PyObject *filterspecs = Py_None;
Expand All @@ -780,9 +828,9 @@ Compressor_new(PyTypeObject *type, PyObject *args, PyObject *kwargs)
_lzma_state *state = PyType_GetModuleState(type);
assert(state != NULL);
if (!PyArg_ParseTupleAndKeywords(args, kwargs,
"|iiOO:LZMACompressor", arg_names,
"|iiOOi:LZMACompressor", arg_names,
&format, &check, &preset_obj,
&filterspecs)) {
&filterspecs, &threads)) {
return NULL;
}

Expand Down Expand Up @@ -826,7 +874,8 @@ Compressor_new(PyTypeObject *type, PyObject *args, PyObject *kwargs)
if (check == -1) {
check = LZMA_CHECK_CRC64;
}
if (Compressor_init_xz(state, &self->lzs, check, preset, filterspecs) != 0) {
if (Compressor_init_xz(state, &self->lzs, check, preset,
filterspecs, threads) != 0) {
goto error;
}
break;
Expand Down