Skip to content

Commit 56c8a1c

Browse files
authored
Merge pull request #848 from jobselko/repair_on_demand_content
Support on-demand content in repair_metadata
2 parents ebf7b2c + 154f96a commit 56c8a1c

File tree

4 files changed

+294
-50
lines changed

4 files changed

+294
-50
lines changed

CHANGES/849.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Added support for on-demand content to `repair_metadata` endpoint.

pulp_python/app/tasks/repair.py

Lines changed: 137 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,32 @@
11
import logging
2-
import uuid
2+
from collections import defaultdict
33
from gettext import gettext as _
4+
from itertools import groupby
5+
from uuid import UUID
46

7+
from django.db.models import Prefetch
58
from django.db.models.query import QuerySet
6-
from pulpcore.plugin.models import ProgressReport
7-
from pulpcore.plugin.util import get_domain
8-
99
from pulp_python.app.models import PythonPackageContent, PythonRepository
10-
from pulp_python.app.utils import artifact_to_python_content_data
10+
from pulp_python.app.utils import (
11+
artifact_to_python_content_data,
12+
fetch_json_release_metadata,
13+
parse_metadata,
14+
)
15+
from pulpcore.plugin.models import ContentArtifact, ProgressReport
16+
from pulpcore.plugin.util import get_domain
1117

1218
log = logging.getLogger(__name__)
1319

1420

15-
def repair(repository_pk: uuid.UUID) -> None:
21+
BULK_SIZE = 1000
22+
23+
24+
def repair(repository_pk: UUID) -> None:
1625
"""
1726
Repairs metadata of all packages for the specified repository.
1827
1928
Args:
20-
repository_pk (uuid.UUID): The primary key of the repository to repair.
29+
repository_pk (UUID): The primary key of the repository to repair.
2130
2231
Returns:
2332
None
@@ -32,11 +41,16 @@ def repair(repository_pk: uuid.UUID) -> None:
3241
content_set = repository.latest_version().content.values_list("pk", flat=True)
3342
content = PythonPackageContent.objects.filter(pk__in=content_set)
3443

35-
num_repaired = repair_metadata(content)
36-
log.info(_("{} packages' metadata repaired.").format(num_repaired))
44+
num_repaired, pkgs_not_repaired = repair_metadata(content)
45+
log.info(
46+
_(
47+
"{} packages' metadata repaired. Not repaired packages due to either "
48+
"inaccessible URL or mismatched sha256: {}."
49+
).format(num_repaired, pkgs_not_repaired)
50+
)
3751

3852

39-
def repair_metadata(content: QuerySet[PythonPackageContent]) -> int:
53+
def repair_metadata(content: QuerySet[PythonPackageContent]) -> tuple[int, set[str]]:
4054
"""
4155
Repairs metadata for a queryset of PythonPackageContent objects
4256
and updates the progress report.
@@ -45,45 +59,140 @@ def repair_metadata(content: QuerySet[PythonPackageContent]) -> int:
4559
content (QuerySet[PythonPackageContent]): The queryset of items to repair.
4660
4761
Returns:
48-
int: The number of packages that were repaired.
62+
tuple[int, set[str]]: A tuple containing:
63+
- The number of packages that were repaired.
64+
- A set of packages' PKs that were not repaired.
4965
"""
50-
# TODO: Add on_demand content repair
51-
immediate_content = content.filter(contentartifact__artifact__isnull=False)
66+
immediate_content = (
67+
content.filter(contentartifact__artifact__isnull=False)
68+
.distinct()
69+
.prefetch_related("_artifacts")
70+
)
71+
on_demand_content = (
72+
content.filter(contentartifact__artifact__isnull=True)
73+
.distinct()
74+
.prefetch_related(
75+
Prefetch(
76+
"contentartifact_set",
77+
queryset=ContentArtifact.objects.prefetch_related("remoteartifact_set"),
78+
)
79+
)
80+
.order_by("name", "version")
81+
)
5282
domain = get_domain()
5383

5484
batch = []
5585
set_of_update_fields = set()
5686
total_repaired = 0
87+
# Keep track of on-demand packages that were not repaired
88+
pkgs_not_repaired = set()
5789

5890
progress_report = ProgressReport(
5991
message="Repairing packages' metadata",
6092
code="repair.metadata",
61-
total=immediate_content.count(),
93+
total=content.count(),
6294
)
6395
progress_report.save()
6496
with progress_report:
6597
for package in progress_report.iter(
66-
immediate_content.prefetch_related("_artifacts").iterator(chunk_size=1000)
98+
immediate_content.iterator(chunk_size=BULK_SIZE)
6799
):
68100
new_data = artifact_to_python_content_data(
69101
package.filename, package._artifacts.get(), domain
70102
)
71-
changed = False
72-
for field, value in new_data.items():
73-
if getattr(package, field) != value:
74-
setattr(package, field, value)
75-
set_of_update_fields.add(field)
76-
changed = True
77-
if changed:
78-
batch.append(package)
79-
if len(batch) == 1000:
80-
total_repaired += len(batch)
81-
PythonPackageContent.objects.bulk_update(batch, set_of_update_fields)
82-
batch = []
83-
set_of_update_fields.clear()
103+
total_repaired += update_package_if_needed(
104+
package, new_data, batch, set_of_update_fields
105+
)
106+
107+
# For on-demand content, we expect that:
108+
# 1. PythonPackageContent always has correct name and version
109+
# 2. RemoteArtifact always has correct sha256
110+
for (name, version), group in groupby(
111+
on_demand_content.iterator(chunk_size=BULK_SIZE),
112+
key=lambda x: (x.name, x.version),
113+
):
114+
group_set = set(group)
115+
grouped_by_url = defaultdict(list)
116+
117+
for package in group_set:
118+
for ra in package.contentartifact_set.get().remoteartifact_set.all():
119+
grouped_by_url[ra.remote.url].append((package, ra))
120+
121+
# Prioritize the URL that can serve the most packages
122+
for url, pkg_ra_pairs in sorted(
123+
grouped_by_url.items(), key=lambda x: len(x[1]), reverse=True
124+
):
125+
if not group_set:
126+
break # No packages left to repair, move onto the next group
127+
remotes = set([pkg_ra[1].remote for pkg_ra in pkg_ra_pairs])
128+
try:
129+
json_data = fetch_json_release_metadata(name, version, remotes)
130+
except Exception:
131+
continue
132+
133+
for package, ra in pkg_ra_pairs:
134+
if package not in group_set:
135+
continue # Package was already repaired
136+
# Extract data only for the specific distribution being checked
137+
dist_data = None
138+
for dist in json_data["urls"]:
139+
if ra.sha256 == dist["digests"]["sha256"]:
140+
dist_data = dist
141+
break
142+
if not dist_data:
143+
continue
144+
145+
new_data = parse_metadata(json_data["info"], version, dist_data)
146+
new_data.pop("url") # url belongs to RemoteArtifact
147+
total_repaired += update_package_if_needed(
148+
package, new_data, batch, set_of_update_fields
149+
)
150+
group_set.remove(package)
151+
progress_report.increment()
152+
# Store and track the unrepaired packages after all URLs are processed
153+
pkgs_not_repaired.update([p.pk for p in group_set])
154+
progress_report.increase_by(len(group_set))
84155

85156
if batch:
86157
total_repaired += len(batch)
87158
PythonPackageContent.objects.bulk_update(batch, set_of_update_fields)
88159

160+
return total_repaired, pkgs_not_repaired
161+
162+
163+
def update_package_if_needed(
164+
package: PythonPackageContent,
165+
new_data: dict,
166+
batch: list[PythonPackageContent],
167+
set_of_update_fields: set[str],
168+
) -> int:
169+
"""
170+
Compares the current package data with new data and updates the package
171+
if needed ("batch" and "set_of_update_fields" are updated in-place).
172+
173+
Args:
174+
package: Package to check and update.
175+
new_data: A dict of new field values to compare against the package.
176+
batch: A list of packages that were updated.
177+
set_of_update_fields: A set of package field names that were updated.
178+
179+
Returns:
180+
The count of repaired packages (increments in multiples of BULK_SIZE only).
181+
"""
182+
total_repaired = 0
183+
changed = False
184+
for field, value in new_data.items():
185+
if getattr(package, field) != value:
186+
setattr(package, field, value)
187+
set_of_update_fields.add(field)
188+
changed = True
189+
if changed:
190+
batch.append(package)
191+
192+
if len(batch) == BULK_SIZE:
193+
PythonPackageContent.objects.bulk_update(batch, set_of_update_fields)
194+
total_repaired += BULK_SIZE
195+
batch.clear()
196+
set_of_update_fields.clear()
197+
89198
return total_repaired

pulp_python/app/utils.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from packaging.utils import canonicalize_name
1010
from packaging.requirements import Requirement
1111
from packaging.version import parse, InvalidVersion
12+
from pulpcore.plugin.models import Remote
1213

1314

1415
PYPI_LAST_SERIAL = "X-PYPI-LAST-SERIAL"
@@ -189,6 +190,37 @@ def artifact_to_python_content_data(filename, artifact, domain=None):
189190
return data
190191

191192

193+
def fetch_json_release_metadata(name: str, version: str, remotes: set[Remote]) -> dict:
194+
"""
195+
Fetches metadata for a specific release from PyPI's JSON API. A release can contain
196+
multiple distributions. See https://docs.pypi.org/api/json/#get-a-release for more details.
197+
All remotes should have the same URL.
198+
199+
Returns:
200+
Dict containing "info", "last_serial", "urls", and "vulnerabilities" keys.
201+
Raises:
202+
Exception if fetching from all remote URLs fails.
203+
"""
204+
remote = next(iter(remotes))
205+
url = remote.get_remote_artifact_url(f"pypi/{name}/{version}/json")
206+
207+
result = None
208+
for remote in remotes:
209+
downloader = remote.get_downloader(url=url, max_retries=1)
210+
try:
211+
result = downloader.fetch()
212+
break
213+
except Exception:
214+
continue
215+
216+
if result:
217+
with open(result.path, "r") as file:
218+
json_data = json.load(file)
219+
return json_data
220+
else:
221+
raise Exception(f"Failed to fetch {url} from any remote.")
222+
223+
192224
def python_content_to_json(base_path, content_query, version=None, domain=None):
193225
"""
194226
Converts a QuerySet of PythonPackageContent into the PyPi JSON format

0 commit comments

Comments
 (0)