|
1 | 1 | import pytest |
2 | 2 |
|
3 | 3 | from pulp_smash.pulp3.utils import gen_distribution, gen_repo |
| 4 | +from pulp_python.tests.functional.constants import PYTHON_EGG_FILENAME, PYTHON_URL |
4 | 5 | from pulp_python.tests.functional.utils import gen_python_remote |
5 | 6 |
|
6 | 7 | from pulpcore.client.pulp_python import ( |
@@ -92,3 +93,35 @@ def _gen_python_remote(**kwargs): |
92 | 93 | return gen_object_with_cleanup(python_remote_api_client, body) |
93 | 94 |
|
94 | 95 | yield _gen_python_remote |
| 96 | + |
| 97 | + |
| 98 | +@pytest.fixture |
| 99 | +def download_python_file(tmp_path, http_get): |
| 100 | + """Download a Python file and return its path.""" |
| 101 | + def _download_python_file(relative_path, url): |
| 102 | + file_path = tmp_path / relative_path |
| 103 | + with open(file_path, mode="wb") as f: |
| 104 | + f.write(http_get(url)) |
| 105 | + return file_path |
| 106 | + |
| 107 | + yield _download_python_file |
| 108 | + |
| 109 | + |
| 110 | +@pytest.fixture |
| 111 | +def python_content_factory(python_content_api_client, download_python_file, monitor_task): |
| 112 | + """A factory to create a Python Package Content.""" |
| 113 | + def _gen_python_content(relative_path=PYTHON_EGG_FILENAME, url=None, **body): |
| 114 | + body["relative_path"] = relative_path |
| 115 | + if url: |
| 116 | + body["file"] = download_python_file(relative_path, url) |
| 117 | + elif not any(x in body for x in ("artifact", "file", "upload")): |
| 118 | + body["file"] = download_python_file(PYTHON_EGG_FILENAME, PYTHON_URL) |
| 119 | + if repo := body.get("repository"): |
| 120 | + repo_href = repo if isinstance(repo, str) else repo.pulp_href |
| 121 | + body["repository"] = repo_href |
| 122 | + |
| 123 | + task = python_content_api_client.create(**body).task |
| 124 | + response = monitor_task(task) |
| 125 | + return python_content_api_client.read(response.created_resources[0]) |
| 126 | + |
| 127 | + yield _gen_python_content |
0 commit comments