Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 58 additions & 2 deletions pulp_python/app/pypi/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
PythonDistribution,
PythonPackageContent,
PythonPublication,
PackageProvenance,
)
from pulp_python.app.pypi.serializers import (
SummarySerializer,
Expand All @@ -61,6 +62,7 @@

ORIGIN_HOST = settings.CONTENT_ORIGIN if settings.CONTENT_ORIGIN else settings.PYPI_API_HOSTNAME
BASE_CONTENT_URL = urljoin(ORIGIN_HOST, settings.CONTENT_PATH_PREFIX)
BASE_API_URL = urljoin(settings.PYPI_API_HOSTNAME, "pypi/")

PYPI_SIMPLE_V1_HTML = "application/vnd.pypi.simple.v1+html"
PYPI_SIMPLE_V1_JSON = "application/vnd.pypi.simple.v1+json"
Expand Down Expand Up @@ -120,6 +122,11 @@ def get_content(repository_version):
"""Returns queryset of the content in this repository version."""
return PythonPackageContent.objects.filter(pk__in=repository_version.content)

@staticmethod
def get_provenances(repository_version):
"""Returns queryset of the provenance for this repository version."""
return PackageProvenance.objects.filter(pk__in=repository_version.content)

def should_redirect(self, repo_version=None):
"""Checks if there is a publication the content app can serve."""
if self.distribution.publication:
Expand All @@ -139,10 +146,13 @@ def get_rvc(self):
def initial(self, request, *args, **kwargs):
"""Perform common initialization tasks for PyPI endpoints."""
super().initial(request, *args, **kwargs)
domain_name = get_domain().name
if settings.DOMAIN_ENABLED:
self.base_content_url = urljoin(BASE_CONTENT_URL, f"{get_domain().name}/")
self.base_content_url = urljoin(BASE_CONTENT_URL, f"{domain_name}/")
self.base_api_url = urljoin(BASE_API_URL, f"{domain_name}/")
else:
self.base_content_url = BASE_CONTENT_URL
self.base_api_url = BASE_API_URL

@classmethod
def urlpattern(cls):
Expand Down Expand Up @@ -273,6 +283,13 @@ def get_renderers(self):
else:
return [JSONRenderer(), BrowsableAPIRenderer()]

def get_provenance_url(self, package, version, filename):
"""Gets the provenance url for a package."""
base_path = self.distribution.base_path
return urljoin(
self.base_api_url, f"{base_path}/integrity/{package}/{version}/{filename}/provenance/"
)

@extend_schema(summary="Get index simple page")
def list(self, request, path):
"""Gets the simple api html page for the index."""
Expand Down Expand Up @@ -308,6 +325,7 @@ def parse_package(release_package):
"size": release_package.size,
"upload_time": release_package.upload_time,
"version": release_package.version,
"provenance": release_package.provenance_url,
}

rfilter = get_remote_package_filter(remote)
Expand Down Expand Up @@ -348,7 +366,8 @@ def retrieve(self, request, path, package):
elif self.should_redirect(repo_version=repo_ver):
return redirect(urljoin(self.base_content_url, f"{path}/simple/{normalized}/"))
if content:
packages = content.filter(name__normalize=normalized).values(
local_packages = content.filter(name__normalize=normalized)
packages = local_packages.values(
"filename",
"sha256",
"metadata_sha256",
Expand All @@ -357,11 +376,19 @@ def retrieve(self, request, path, package):
"pulp_created",
"version",
)
provenances = PackageProvenance.objects.filter(package__in=local_packages).values_list(
"package__filename", flat=True
)
local_releases = {
p["filename"]: {
**p,
"url": urljoin(self.base_content_url, f"{path}/{p['filename']}"),
"upload_time": p["pulp_created"],
"provenance": (
self.get_provenance_url(normalized, p["version"], p["filename"])
if p["filename"] in provenances
else None
),
}
for p in packages
}
Expand Down Expand Up @@ -497,3 +524,32 @@ def create(self, request, path):
This is the endpoint that tools like Twine and Poetry use for their upload commands.
"""
return self.upload(request, path)


class ProvenanceView(PyPIMixin, ViewSet):
"""View for the PyPI provenance endpoint."""

endpoint_name = "integrity"
DEFAULT_ACCESS_POLICY = {
"statements": [
{
"action": ["retrieve"],
"principal": "*",
"effect": "allow",
},
],
}

@extend_schema(summary="Get package provenance")
def retrieve(self, request, path, package, version, filename):
"""Gets the provenance for a package."""
repo_ver, content = self.get_rvc()
if content:
package_content = content.filter(
name__normalize=package, version=version, filename=filename
).first()
if package_content:
provenance = PackageProvenance.objects.filter(package=package_content).first()
if provenance:
return Response(data=provenance.provenance)
return HttpResponseNotFound(f"{package} {version} {filename} provenance does not exist.")
13 changes: 12 additions & 1 deletion pulp_python/app/urls.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
from django.conf import settings
from django.urls import path

from pulp_python.app.pypi.views import SimpleView, MetadataView, PyPIView, UploadView
from pulp_python.app.pypi.views import (
SimpleView,
MetadataView,
PyPIView,
UploadView,
ProvenanceView,
)

if settings.DOMAIN_ENABLED:
PYPI_API_URL = "pypi/<slug:pulp_domain>/<path:path>/"
Expand All @@ -13,6 +19,11 @@

urlpatterns = [
path(PYPI_API_URL + "legacy/", UploadView.as_view({"post": "create"}), name="upload"),
path(
PYPI_API_URL + "integrity/<str:package>/<str:version>/<str:filename>/provenance/",
ProvenanceView.as_view({"get": "retrieve"}),
name="integrity-provenance",
),
path(
PYPI_API_URL + "pypi/<path:meta>/",
MetadataView.as_view({"get": "retrieve"}),
Expand Down
6 changes: 4 additions & 2 deletions pulp_python/app/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@
<body>
<h1>Links for {{ project_name }}</h1>
{% for pkg in project_packages %}
<a href="{{ pkg.url }}#sha256={{ pkg.sha256 }}" rel="internal">{{ pkg.filename }}</a><br/>
<a href="{{ pkg.url }}#sha256={{ pkg.sha256 }}" rel="internal" {% if pkg.provenance -%}
data-provenance="{{ pkg.provenance }}"{% endif %}>{{ pkg.filename }}</a><br/>
{% endfor %}
</body>
</html>
Expand Down Expand Up @@ -478,7 +479,8 @@ def write_simple_detail_json(project_name, project_packages):
"upload-time": format_upload_time(package["upload_time"]),
# TODO in the future:
# core-metadata (PEP 7.14)
# provenance (v1.3, PEP 740)
# (v1.3, PEP 740)
"provenance": package.get("provenance", None),
}
for package in project_packages
],
Expand Down
85 changes: 62 additions & 23 deletions pulp_python/tests/functional/api/test_attestations.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,57 +6,96 @@
from pulpcore.tests.functional.utils import PulpTaskError


@pytest.mark.parallel
def test_crd_provenance(python_bindings, python_content_factory, monitor_task):
"""
Test creating and reading a provenance.
"""
filename = "twine-6.2.0-py3-none-any.whl"
@pytest.fixture(scope="session")
def twine_package():
"""Returns the twine package."""
filename = "twine-6.2.0.tar.gz"
with PyPISimple() as client:
page = client.get_project_page("twine")
for package in page.packages:
if package.filename == filename:
content = python_content_factory(filename, url=package.url)
break
return package

raise ValueError("Twine package not found")


@pytest.mark.parallel
def test_crd_provenance(python_bindings, twine_package, python_content_factory, monitor_task):
"""
Test creating and reading a provenance.
"""
content = python_content_factory(relative_path=twine_package.filename, url=twine_package.url)

provenance = python_bindings.ContentProvenanceApi.create(
package=content.pulp_href,
file_url=package.provenance_url,
file_url=twine_package.provenance_url,
)
task = monitor_task(provenance.task)
provenance = python_bindings.ContentProvenanceApi.read(task.created_resources[0])
provenance = python_bindings.ContentProvenanceApi.read(task.created_resources[-1])
assert provenance.package == content.pulp_href
r = requests.get(package.provenance_url)
r = requests.get(twine_package.provenance_url)
assert r.status_code == 200
assert r.json() == provenance.provenance


@pytest.mark.parallel
def test_verify_provenance(python_bindings, python_content_factory, monitor_task):
def test_verify_provenance(python_bindings, twine_package, python_content_factory, monitor_task):
"""
Test verifying a provenance.
"""
filename = "twine-6.2.0.tar.gz"
with PyPISimple() as client:
page = client.get_project_page("twine")
for package in page.packages:
if package.filename == filename:
break
wrong_content = python_content_factory() # shelf-reader-0.1.tar.gz
wrong_content = python_content_factory(
relative_path=twine_package.filename, url=twine_package.url
)
prov_url = twine_package.provenance_url.replace(
"twine-6.2.0.tar.gz", "twine-6.2.0-py3-none-any.whl"
)
provenance = python_bindings.ContentProvenanceApi.create(
package=wrong_content.pulp_href,
file_url=package.provenance_url,
file_url=prov_url,
)
with pytest.raises(PulpTaskError) as e:
monitor_task(provenance.task)
assert e.value.task.state == "failed"
assert "twine-6.2.0.tar.gz != shelf-reader-0.1.tar.gz" in e.value.task.error["description"]
assert "twine-6.2.0-py3-none-any.whl != twine-6.2.0.tar.gz" in e.value.task.error["description"]

# Test creating a provenance without verifying
provenance = python_bindings.ContentProvenanceApi.create(
package=wrong_content.pulp_href,
file_url=package.provenance_url,
file_url=twine_package.provenance_url,
verify=False,
)
task = monitor_task(provenance.task)
provenance = python_bindings.ContentProvenanceApi.read(task.created_resources[0])
provenance = python_bindings.ContentProvenanceApi.read(task.created_resources[-1])
assert provenance.package == wrong_content.pulp_href


@pytest.mark.parallel
def test_integrity_api(
python_bindings,
python_repo,
python_distribution_factory,
twine_package,
python_content_factory,
monitor_task,
):
"""
Test the integrity API.
"""
content = python_content_factory(
relative_path=twine_package.filename,
repository=python_repo.pulp_href,
url=twine_package.url,
)
provenance = python_bindings.ContentProvenanceApi.create(
package=content.pulp_href,
file_url=twine_package.provenance_url,
repository=python_repo.pulp_href,
)
task = monitor_task(provenance.task)
provenance = python_bindings.ContentProvenanceApi.read(task.created_resources[-1])

distro = python_distribution_factory(repository=python_repo.pulp_href)
url = f"{distro.base_url}integrity/twine/6.2.0/{twine_package.filename}/provenance/"
r = requests.get(url)
assert r.status_code == 200
assert r.json() == provenance.provenance
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ def test_simple_json_detail_api(
assert file_tar["data-dist-info-metadata"] is False
assert file_tar["size"] == 19097
assert file_tar["upload-time"] is not None
assert file_tar["provenance"] is None


@pytest.mark.parallel
Expand Down