From 7c5148b38a7f87608e9047f9b35c81bc1f0cc4ce Mon Sep 17 00:00:00 2001 From: Gerrod Ubben Date: Tue, 25 Nov 2025 00:25:56 -0500 Subject: [PATCH] Add integrity API for serving PEP 740 Provenance objects --- pulp_python/app/pypi/views.py | 60 ++++++++++++- pulp_python/app/urls.py | 13 ++- pulp_python/app/utils.py | 6 +- .../tests/functional/api/test_attestations.py | 85 ++++++++++++++----- .../api/test_pypi_simple_json_api.py | 1 + 5 files changed, 137 insertions(+), 28 deletions(-) diff --git a/pulp_python/app/pypi/views.py b/pulp_python/app/pypi/views.py index 3fc965f3..e2d76ec9 100644 --- a/pulp_python/app/pypi/views.py +++ b/pulp_python/app/pypi/views.py @@ -37,6 +37,7 @@ PythonDistribution, PythonPackageContent, PythonPublication, + PackageProvenance, ) from pulp_python.app.pypi.serializers import ( SummarySerializer, @@ -61,6 +62,7 @@ ORIGIN_HOST = settings.CONTENT_ORIGIN if settings.CONTENT_ORIGIN else settings.PYPI_API_HOSTNAME BASE_CONTENT_URL = urljoin(ORIGIN_HOST, settings.CONTENT_PATH_PREFIX) +BASE_API_URL = urljoin(settings.PYPI_API_HOSTNAME, "pypi/") PYPI_SIMPLE_V1_HTML = "application/vnd.pypi.simple.v1+html" PYPI_SIMPLE_V1_JSON = "application/vnd.pypi.simple.v1+json" @@ -120,6 +122,11 @@ def get_content(repository_version): """Returns queryset of the content in this repository version.""" return PythonPackageContent.objects.filter(pk__in=repository_version.content) + @staticmethod + def get_provenances(repository_version): + """Returns queryset of the provenance for this repository version.""" + return PackageProvenance.objects.filter(pk__in=repository_version.content) + def should_redirect(self, repo_version=None): """Checks if there is a publication the content app can serve.""" if self.distribution.publication: @@ -139,10 +146,13 @@ def get_rvc(self): def initial(self, request, *args, **kwargs): """Perform common initialization tasks for PyPI endpoints.""" super().initial(request, *args, **kwargs) + domain_name = get_domain().name if settings.DOMAIN_ENABLED: - self.base_content_url = urljoin(BASE_CONTENT_URL, f"{get_domain().name}/") + self.base_content_url = urljoin(BASE_CONTENT_URL, f"{domain_name}/") + self.base_api_url = urljoin(BASE_API_URL, f"{domain_name}/") else: self.base_content_url = BASE_CONTENT_URL + self.base_api_url = BASE_API_URL @classmethod def urlpattern(cls): @@ -273,6 +283,13 @@ def get_renderers(self): else: return [JSONRenderer(), BrowsableAPIRenderer()] + def get_provenance_url(self, package, version, filename): + """Gets the provenance url for a package.""" + base_path = self.distribution.base_path + return urljoin( + self.base_api_url, f"{base_path}/integrity/{package}/{version}/{filename}/provenance/" + ) + @extend_schema(summary="Get index simple page") def list(self, request, path): """Gets the simple api html page for the index.""" @@ -308,6 +325,7 @@ def parse_package(release_package): "size": release_package.size, "upload_time": release_package.upload_time, "version": release_package.version, + "provenance": release_package.provenance_url, } rfilter = get_remote_package_filter(remote) @@ -348,7 +366,8 @@ def retrieve(self, request, path, package): elif self.should_redirect(repo_version=repo_ver): return redirect(urljoin(self.base_content_url, f"{path}/simple/{normalized}/")) if content: - packages = content.filter(name__normalize=normalized).values( + local_packages = content.filter(name__normalize=normalized) + packages = local_packages.values( "filename", "sha256", "metadata_sha256", @@ -357,11 +376,19 @@ def retrieve(self, request, path, package): "pulp_created", "version", ) + provenances = PackageProvenance.objects.filter(package__in=local_packages).values_list( + "package__filename", flat=True + ) local_releases = { p["filename"]: { **p, "url": urljoin(self.base_content_url, f"{path}/{p['filename']}"), "upload_time": p["pulp_created"], + "provenance": ( + self.get_provenance_url(normalized, p["version"], p["filename"]) + if p["filename"] in provenances + else None + ), } for p in packages } @@ -497,3 +524,32 @@ def create(self, request, path): This is the endpoint that tools like Twine and Poetry use for their upload commands. """ return self.upload(request, path) + + +class ProvenanceView(PyPIMixin, ViewSet): + """View for the PyPI provenance endpoint.""" + + endpoint_name = "integrity" + DEFAULT_ACCESS_POLICY = { + "statements": [ + { + "action": ["retrieve"], + "principal": "*", + "effect": "allow", + }, + ], + } + + @extend_schema(summary="Get package provenance") + def retrieve(self, request, path, package, version, filename): + """Gets the provenance for a package.""" + repo_ver, content = self.get_rvc() + if content: + package_content = content.filter( + name__normalize=package, version=version, filename=filename + ).first() + if package_content: + provenance = PackageProvenance.objects.filter(package=package_content).first() + if provenance: + return Response(data=provenance.provenance) + return HttpResponseNotFound(f"{package} {version} {filename} provenance does not exist.") diff --git a/pulp_python/app/urls.py b/pulp_python/app/urls.py index 405ab119..513b6932 100644 --- a/pulp_python/app/urls.py +++ b/pulp_python/app/urls.py @@ -1,7 +1,13 @@ from django.conf import settings from django.urls import path -from pulp_python.app.pypi.views import SimpleView, MetadataView, PyPIView, UploadView +from pulp_python.app.pypi.views import ( + SimpleView, + MetadataView, + PyPIView, + UploadView, + ProvenanceView, +) if settings.DOMAIN_ENABLED: PYPI_API_URL = "pypi///" @@ -13,6 +19,11 @@ urlpatterns = [ path(PYPI_API_URL + "legacy/", UploadView.as_view({"post": "create"}), name="upload"), + path( + PYPI_API_URL + "integrity////provenance/", + ProvenanceView.as_view({"get": "retrieve"}), + name="integrity-provenance", + ), path( PYPI_API_URL + "pypi//", MetadataView.as_view({"get": "retrieve"}), diff --git a/pulp_python/app/utils.py b/pulp_python/app/utils.py index 0fb6ddbf..cb918001 100644 --- a/pulp_python/app/utils.py +++ b/pulp_python/app/utils.py @@ -44,7 +44,8 @@

Links for {{ project_name }}

{% for pkg in project_packages %} - {{ pkg.filename }}
+ {{ pkg.filename }}
{% endfor %} @@ -478,7 +479,8 @@ def write_simple_detail_json(project_name, project_packages): "upload-time": format_upload_time(package["upload_time"]), # TODO in the future: # core-metadata (PEP 7.14) - # provenance (v1.3, PEP 740) + # (v1.3, PEP 740) + "provenance": package.get("provenance", None), } for package in project_packages ], diff --git a/pulp_python/tests/functional/api/test_attestations.py b/pulp_python/tests/functional/api/test_attestations.py index 4695e293..9200c8e6 100644 --- a/pulp_python/tests/functional/api/test_attestations.py +++ b/pulp_python/tests/functional/api/test_attestations.py @@ -6,57 +6,96 @@ from pulpcore.tests.functional.utils import PulpTaskError -@pytest.mark.parallel -def test_crd_provenance(python_bindings, python_content_factory, monitor_task): - """ - Test creating and reading a provenance. - """ - filename = "twine-6.2.0-py3-none-any.whl" +@pytest.fixture(scope="session") +def twine_package(): + """Returns the twine package.""" + filename = "twine-6.2.0.tar.gz" with PyPISimple() as client: page = client.get_project_page("twine") for package in page.packages: if package.filename == filename: - content = python_content_factory(filename, url=package.url) - break + return package + + raise ValueError("Twine package not found") + + +@pytest.mark.parallel +def test_crd_provenance(python_bindings, twine_package, python_content_factory, monitor_task): + """ + Test creating and reading a provenance. + """ + content = python_content_factory(relative_path=twine_package.filename, url=twine_package.url) + provenance = python_bindings.ContentProvenanceApi.create( package=content.pulp_href, - file_url=package.provenance_url, + file_url=twine_package.provenance_url, ) task = monitor_task(provenance.task) - provenance = python_bindings.ContentProvenanceApi.read(task.created_resources[0]) + provenance = python_bindings.ContentProvenanceApi.read(task.created_resources[-1]) assert provenance.package == content.pulp_href - r = requests.get(package.provenance_url) + r = requests.get(twine_package.provenance_url) assert r.status_code == 200 assert r.json() == provenance.provenance @pytest.mark.parallel -def test_verify_provenance(python_bindings, python_content_factory, monitor_task): +def test_verify_provenance(python_bindings, twine_package, python_content_factory, monitor_task): """ Test verifying a provenance. """ - filename = "twine-6.2.0.tar.gz" - with PyPISimple() as client: - page = client.get_project_page("twine") - for package in page.packages: - if package.filename == filename: - break - wrong_content = python_content_factory() # shelf-reader-0.1.tar.gz + wrong_content = python_content_factory( + relative_path=twine_package.filename, url=twine_package.url + ) + prov_url = twine_package.provenance_url.replace( + "twine-6.2.0.tar.gz", "twine-6.2.0-py3-none-any.whl" + ) provenance = python_bindings.ContentProvenanceApi.create( package=wrong_content.pulp_href, - file_url=package.provenance_url, + file_url=prov_url, ) with pytest.raises(PulpTaskError) as e: monitor_task(provenance.task) assert e.value.task.state == "failed" - assert "twine-6.2.0.tar.gz != shelf-reader-0.1.tar.gz" in e.value.task.error["description"] + assert "twine-6.2.0-py3-none-any.whl != twine-6.2.0.tar.gz" in e.value.task.error["description"] # Test creating a provenance without verifying provenance = python_bindings.ContentProvenanceApi.create( package=wrong_content.pulp_href, - file_url=package.provenance_url, + file_url=twine_package.provenance_url, verify=False, ) task = monitor_task(provenance.task) - provenance = python_bindings.ContentProvenanceApi.read(task.created_resources[0]) + provenance = python_bindings.ContentProvenanceApi.read(task.created_resources[-1]) assert provenance.package == wrong_content.pulp_href + + +@pytest.mark.parallel +def test_integrity_api( + python_bindings, + python_repo, + python_distribution_factory, + twine_package, + python_content_factory, + monitor_task, +): + """ + Test the integrity API. + """ + content = python_content_factory( + relative_path=twine_package.filename, + repository=python_repo.pulp_href, + url=twine_package.url, + ) + provenance = python_bindings.ContentProvenanceApi.create( + package=content.pulp_href, + file_url=twine_package.provenance_url, + repository=python_repo.pulp_href, + ) + task = monitor_task(provenance.task) + provenance = python_bindings.ContentProvenanceApi.read(task.created_resources[-1]) + + distro = python_distribution_factory(repository=python_repo.pulp_href) + url = f"{distro.base_url}integrity/twine/6.2.0/{twine_package.filename}/provenance/" + r = requests.get(url) + assert r.status_code == 200 + assert r.json() == provenance.provenance diff --git a/pulp_python/tests/functional/api/test_pypi_simple_json_api.py b/pulp_python/tests/functional/api/test_pypi_simple_json_api.py index 6a0bfb9f..e2c70896 100644 --- a/pulp_python/tests/functional/api/test_pypi_simple_json_api.py +++ b/pulp_python/tests/functional/api/test_pypi_simple_json_api.py @@ -97,6 +97,7 @@ def test_simple_json_detail_api( assert file_tar["data-dist-info-metadata"] is False assert file_tar["size"] == 19097 assert file_tar["upload-time"] is not None + assert file_tar["provenance"] is None @pytest.mark.parallel