Skip to content

Commit 71cb5ce

Browse files
committed
Add a chunk_size setting
Users experiencing issues when uploading huge files can now define a default chunk size to use per server profile.
1 parent 1cc25cd commit 71cb5ce

File tree

12 files changed

+103
-71
lines changed

12 files changed

+103
-71
lines changed

CHANGES/1305.feature

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
Change the default chunking value to "infinite". If users wants to, they can still specify a value based on their needs.
1+
Removed the default `chunk_size` value. If necessary, a suitung value should be provided with the call or configured for the profile.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Allow to specify `None` for the `chunk_size` of content upload commands to disable chunking.

pulp-glue/src/pulp_glue/ansible/context.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ class PulpAnsibleCollectionVersionContext(PulpContentContext):
2929
def upload(self, file: t.IO[bytes], **kwargs: t.Any) -> t.Any: # type:ignore
3030
repository: PulpRepositoryContext | None = kwargs.pop("repository", None)
3131
if self.capable("upload"):
32-
chunk_size: int = kwargs.pop("chunk_size", 1000000)
32+
chunk_size: int | None = kwargs.pop("chunk_size", None)
3333
return super().upload(file, chunk_size, repository, **kwargs)
3434
else:
3535
result = self.call("upload", body={"file": file})

pulp-glue/src/pulp_glue/common/context.py

Lines changed: 34 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import typing as t
77
import warnings
88
from contextlib import ExitStack
9+
from pathlib import Path
910

1011
from packaging.specifiers import SpecifierSet
1112

@@ -248,6 +249,7 @@ def __init__(
248249
fake_mode: bool = False,
249250
verify_ssl: bool | str | None = None,
250251
verify: bool | str | None = None, # Deprecated
252+
chunk_size: int | None = None,
251253
) -> None:
252254
self._api: OpenAPI | None = None
253255
self._api_root: str = api_root
@@ -273,6 +275,7 @@ def __init__(
273275
self.fake_mode: bool = fake_mode
274276
if self.fake_mode:
275277
self._api_kwargs["dry_run"] = True
278+
self.chunk_size = chunk_size
276279

277280
@classmethod
278281
def from_config_files(
@@ -1565,6 +1568,7 @@ def __init__(
15651568
repository_ctx: PulpRepositoryContext | None = None,
15661569
):
15671570
super().__init__(pulp_ctx, pulp_href=pulp_href, entity=entity)
1571+
assert (repository_ctx is None) or (repository_ctx.pulp_ctx is pulp_ctx)
15681572
self.repository_ctx = repository_ctx
15691573

15701574
def list(self, limit: int, offset: int, parameters: dict[str, t.Any]) -> list[t.Any]:
@@ -1578,6 +1582,29 @@ def find(self, **kwargs: t.Any) -> t.Any:
15781582
kwargs["repository_version"] = self.repository_ctx.entity["latest_version_href"]
15791583
return super().find(**kwargs)
15801584

1585+
def _prepare_upload(
1586+
self,
1587+
body: EntityDefinition,
1588+
file: t.IO[bytes],
1589+
chunk_size: int | None,
1590+
) -> None:
1591+
_chunk_size: int | None = chunk_size or self.pulp_ctx.chunk_size
1592+
size = os.path.getsize(file.name)
1593+
if not self.pulp_ctx.fake_mode: # Skip the uploading part in fake_mode
1594+
if _chunk_size is None or _chunk_size > size:
1595+
body["file"] = file
1596+
elif self.pulp_ctx.has_plugin(PluginRequirement("core", specifier=">=3.20.0")):
1597+
self.needs_capability("upload")
1598+
from pulp_glue.core.context import PulpUploadContext
1599+
1600+
upload_href = PulpUploadContext(self.pulp_ctx).upload_file(file, _chunk_size)
1601+
body["upload"] = upload_href
1602+
else:
1603+
from pulp_glue.core.context import PulpArtifactContext
1604+
1605+
artifact_href = PulpArtifactContext(self.pulp_ctx).upload(file, _chunk_size)
1606+
body["artifact"] = artifact_href
1607+
15811608
def create(
15821609
self,
15831610
body: EntityDefinition,
@@ -1589,24 +1616,10 @@ def create(
15891616
file = body.pop("file", None)
15901617
chunk_size: int | None = body.pop("chunk_size", None)
15911618
if file:
1592-
if isinstance(file, str):
1619+
if isinstance(file, str | Path):
15931620
file = open(file, "rb")
15941621
cleanup.enter_context(file)
1595-
size = os.path.getsize(file.name)
1596-
if not self.pulp_ctx.fake_mode: # Skip the uploading part in fake_mode
1597-
if chunk_size is None or chunk_size > size:
1598-
body["file"] = file
1599-
elif self.pulp_ctx.has_plugin(PluginRequirement("core", specifier=">=3.20.0")):
1600-
self.needs_capability("upload")
1601-
from pulp_glue.core.context import PulpUploadContext
1602-
1603-
upload_href = PulpUploadContext(self.pulp_ctx).upload_file(file, chunk_size)
1604-
body["upload"] = upload_href
1605-
else:
1606-
from pulp_glue.core.context import PulpArtifactContext
1607-
1608-
artifact_href = PulpArtifactContext(self.pulp_ctx).upload(file, chunk_size)
1609-
body["artifact"] = artifact_href
1622+
self._prepare_upload(body, file, chunk_size)
16101623
if self.repository_ctx is not None:
16111624
body["repository"] = self.repository_ctx
16121625
return super().create(body=body, parameters=parameters, non_blocking=non_blocking)
@@ -1618,7 +1631,7 @@ def delete(self, non_blocking: bool = False) -> None:
16181631
def upload(
16191632
self,
16201633
file: t.IO[bytes],
1621-
chunk_size: int,
1634+
chunk_size: int | None,
16221635
repository: PulpRepositoryContext | None,
16231636
**kwargs: t.Any,
16241637
) -> t.Any:
@@ -1629,31 +1642,18 @@ def upload(
16291642
16301643
Parameters:
16311644
file: A file like object that supports `os.path.getsize`.
1632-
chunk_size: Size of the chunks to upload independently.
1645+
chunk_size: Size of the chunks to upload independently. `None` to disable chunking.
16331646
repository: Repository context to add the newly created content to.
16341647
kwargs: Extra args specific to the content type, passed to the create call.
16351648
16361649
Returns:
16371650
The result of the create task.
16381651
"""
16391652
self.needs_capability("upload")
1640-
size = os.path.getsize(file.name)
16411653
body: dict[str, t.Any] = {**kwargs}
1642-
if not self.pulp_ctx.fake_mode: # Skip the uploading part in fake_mode
1643-
if chunk_size > size:
1644-
body["file"] = file
1645-
elif self.pulp_ctx.has_plugin(PluginRequirement("core", specifier=">=3.20.0")):
1646-
from pulp_glue.core.context import PulpUploadContext
1647-
1648-
upload_href = PulpUploadContext(self.pulp_ctx).upload_file(file, chunk_size)
1649-
body["upload"] = upload_href
1650-
else:
1651-
from pulp_glue.core.context import PulpArtifactContext
1652-
1653-
artifact_href = PulpArtifactContext(self.pulp_ctx).upload(file, chunk_size)
1654-
body["artifact"] = artifact_href
1655-
if repository:
1656-
body["repository"] = repository
1654+
self._prepare_upload(body, file, chunk_size)
1655+
if repository is not None:
1656+
body["repository"] = repository
16571657
return self.create(body=body)
16581658

16591659

pulp-glue/src/pulp_glue/core/context.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,15 @@ class PulpArtifactContext(PulpEntityContext):
4545
ID_PREFIX = "artifacts"
4646

4747
def upload(
48-
self, file: t.IO[bytes], chunk_size: int = 1000000, sha256: str | None = None
48+
self,
49+
file: t.IO[bytes],
50+
chunk_size: int | None = None,
51+
sha256: str | None = None,
4952
) -> t.Any:
5053
size = os.path.getsize(file.name)
5154

5255
sha256_hasher = hashlib.sha256()
53-
for chunk in iter(lambda: file.read(chunk_size), b""):
56+
for chunk in iter(lambda: file.read(10_000_000), b""):
5457
sha256_hasher.update(chunk)
5558
sha256_digest = sha256_hasher.hexdigest()
5659
file.seek(0)
@@ -71,8 +74,8 @@ def upload(
7174
self._entity = {"pulp_href": "<FAKE_ENTITY>", "sha256": sha256, "size": size}
7275
self._entity_lookup = {}
7376
return self._entity["pulp_href"]
74-
if chunk_size > size:
75-
# if chunk_size is bigger than the file size, just upload it directly
77+
if chunk_size is None or chunk_size > size:
78+
# upload it directly
7679
artifact: dict[str, t.Any] = self.create({"sha256": sha256_digest, "file": file})
7780
self.pulp_href = artifact["pulp_href"]
7881
return artifact["pulp_href"]

pulp-glue/src/pulp_glue/rpm/context.py

Lines changed: 4 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import os
21
import typing as t
32

43
from pulp_glue.common.context import (
@@ -156,7 +155,7 @@ def list_iterator(
156155
def upload(
157156
self,
158157
file: t.IO[bytes],
159-
chunk_size: int,
158+
chunk_size: int | None,
160159
repository: PulpRepositoryContext | None,
161160
**kwargs: t.Any,
162161
) -> t.Any:
@@ -167,42 +166,24 @@ def upload(
167166
168167
Parameters:
169168
file: A file like object that supports `os.path.getsize`.
170-
chunk_size: Size of the chunks to upload independently.
169+
chunk_size: Size of the chunks to upload independently. `None` to disable chunking.
171170
repository: Repository context to add the newly created content to.
172171
kwargs: Extra args specific to the content type, passed to the create call.
173172
174173
Returns:
175174
The result of the create task.
176175
"""
177176
self.needs_capability("upload")
178-
size = os.path.getsize(file.name)
179177
body: dict[str, t.Any] = {**kwargs}
180178

181-
if not self.pulp_ctx.fake_mode:
182-
if chunk_size > size:
183-
# Small file: direct upload
184-
body["file"] = file
185-
else:
186-
# Large file: chunked upload
187-
if self.pulp_ctx.has_plugin(PluginRequirement("core", specifier=">=3.20.0")):
188-
from pulp_glue.core.context import PulpUploadContext
189-
190-
upload_href = PulpUploadContext(self.pulp_ctx).upload_file(file, chunk_size)
191-
body["upload"] = upload_href
192-
else:
193-
from pulp_glue.core.context import PulpArtifactContext
194-
195-
artifact_href = PulpArtifactContext(self.pulp_ctx).upload(file, chunk_size)
196-
body["artifact"] = artifact_href
179+
self._prepare_upload(body, file, chunk_size)
197180

198-
# For rpm plugin >= 3.32.5, use synchronous upload endpoint when no repository is provided
199-
# For older versions, always use the create endpoint (backward compatibility)
200181
if repository is None and self.pulp_ctx.has_plugin(
201182
PluginRequirement("rpm", specifier=">=3.32.5")
202183
):
184+
# "Synchronous upload"
203185
return self.call("upload", body=body)
204186

205-
# Repository is specified or older rpm version: use create endpoint (async path)
206187
if repository is not None:
207188
body["repository"] = repository
208189
return self.create(body=body)

src/pulp_cli/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,7 @@ def main(
202202
verbose: int,
203203
background: bool,
204204
refresh_api: bool,
205+
chunk_size: int | None,
205206
dry_run: bool,
206207
timeout: int,
207208
cid: str,
@@ -231,6 +232,7 @@ def main(
231232
password=password,
232233
oauth2_client_id=client_id,
233234
oauth2_client_secret=client_secret,
235+
chunk_size=chunk_size,
234236
)
235237

236238

src/pulp_cli/config.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,14 @@
99

1010
from pulp_glue.common.i18n import get_translation
1111

12-
from pulp_cli.generic import HEADER_REGEX, REGISTERED_OUTPUT_FORMATTERS, _unset, pulp_group
12+
from pulp_cli.generic import (
13+
HEADER_REGEX,
14+
REGISTERED_OUTPUT_FORMATTERS,
15+
_unset,
16+
chunk_size_callback,
17+
parse_size,
18+
pulp_group,
19+
)
1320

1421
if sys.version_info >= (3, 11):
1522
import tomllib
@@ -43,6 +50,7 @@
4350
"key",
4451
"verify_ssl",
4552
"format",
53+
"chunk_size",
4654
"dry_run",
4755
"timeout",
4856
"verbose",
@@ -101,6 +109,12 @@ def headers_callback(
101109
default="json",
102110
help=_("Format of the response"),
103111
),
112+
click.option(
113+
"--chunk-size",
114+
help=_("Chunk size to break up {entity} into. Defaults to not chunking at all."),
115+
default=None,
116+
callback=chunk_size_callback,
117+
),
104118
click.option(
105119
"--dry-run/--force",
106120
default=False,
@@ -158,6 +172,11 @@ def validate_config(config: dict[str, t.Any], strict: bool = False) -> None:
158172
errors.append(_("'format' is not one of {choices}").format(choices=FORMAT_CHOICES))
159173
if "verify_ssl" in config and not isinstance(config["verify_ssl"], bool):
160174
errors.append(_("'verify_ssl' is not a bool"))
175+
if "chunk_size" in config:
176+
try:
177+
parse_size(config["chunk_size"])
178+
except click.ClickException as e:
179+
errors.append(e.message)
161180
if "dry_run" in config and not isinstance(config["dry_run"], bool):
162181
errors.append(_("'dry_run' is not a bool"))
163182
if "timeout" in config and not isinstance(config["timeout"], int):

src/pulp_cli/generic.py

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,18 @@
3333
from pulp_glue.common.i18n import get_translation
3434
from pulp_glue.common.openapi import AuthProviderBase
3535

36+
if sys.version_info >= (3, 13):
37+
from warnings import deprecated
38+
else:
39+
T = t.TypeVar("T")
40+
41+
def deprecated(s: str) -> t.Callable[[T], T]:
42+
def _inner(f: T) -> T:
43+
return f
44+
45+
return _inner
46+
47+
3648
try:
3749
from pygments import highlight
3850
from pygments.formatters import Terminal256Formatter
@@ -67,7 +79,6 @@ def _unset(value: t.Any) -> bool:
6779
translation = get_translation(__package__)
6880
_ = translation.gettext
6981

70-
7182
_AnyCallable = t.Callable[..., t.Any]
7283
FC = t.TypeVar("FC", bound=_AnyCallable | click.Command)
7384

@@ -156,6 +167,7 @@ def __init__(
156167
password: str | None = None,
157168
oauth2_client_id: str | None = None,
158169
oauth2_client_secret: str | None = None,
170+
chunk_size: int | None = None,
159171
) -> None:
160172
self.username = username
161173
self.password = password
@@ -172,6 +184,7 @@ def __init__(
172184
background_tasks=background_tasks,
173185
timeout=timeout,
174186
domain=domain,
187+
chunk_size=chunk_size,
175188
)
176189
self.format = format
177190

@@ -717,9 +730,9 @@ def _callback(
717730
units = {"B": 1, "KB": 10**3, "MB": 10**6, "GB": 10**9, "TB": 10**12}
718731

719732

720-
def parse_size_callback(ctx: click.Context, param: click.Parameter, value: str | None) -> int:
733+
def parse_size(value: str | None) -> int | None:
721734
if value is None:
722-
return 8 * 10**9
735+
return None
723736
size = value.strip().upper()
724737
match = re.match(r"^([0-9]+)\s*([KMGT]?B)?$", size)
725738
if not match:
@@ -728,6 +741,18 @@ def parse_size_callback(ctx: click.Context, param: click.Parameter, value: str |
728741
return int(float(number) * units[unit])
729742

730743

744+
def chunk_size_callback(
745+
ctx: click.Context, param: click.Parameter, value: str | None
746+
) -> int | None:
747+
if value == "":
748+
# Actually override the default.
749+
return None
750+
return parse_size(value)
751+
752+
753+
parse_size_callback = deprecated("Use 'chunk_size_callback' instead.")(chunk_size_callback)
754+
755+
731756
def null_callback(ctx: click.Context, param: click.Parameter, value: str | None) -> str | None:
732757
if value == "":
733758
return "null"
@@ -1220,7 +1245,7 @@ def _type_callback(ctx: click.Context, param: click.Parameter, value: str | None
12201245
"--chunk-size",
12211246
help=_("Chunk size to break up {entity} into. Defaults to not chunking at all."),
12221247
default=None,
1223-
callback=parse_size_callback,
1248+
callback=chunk_size_callback,
12241249
)
12251250

12261251
pulp_created_gte_option = pulp_option(

0 commit comments

Comments
 (0)