From e7c9554fecc31f66c891e7ee205684819b635747 Mon Sep 17 00:00:00 2001 From: Colin Maudry Date: Sun, 14 Dec 2025 15:01:54 +0100 Subject: [PATCH 1/6] =?UTF-8?q?M=C3=A9nage=20dans=20les=20env,=20les=20che?= =?UTF-8?q?mins?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- {data => reference}/schema_base.json | 0 {data => reference}/source_datasets.json | 0 src/config.py | 30 ++++++++++-------------- src/deploy.py | 11 +++++---- src/flows/decp_processing.py | 2 +- src/tasks/output.py | 4 ++-- 6 files changed, 23 insertions(+), 24 deletions(-) rename {data => reference}/schema_base.json (100%) rename {data => reference}/source_datasets.json (100%) diff --git a/data/schema_base.json b/reference/schema_base.json similarity index 100% rename from data/schema_base.json rename to reference/schema_base.json diff --git a/data/source_datasets.json b/reference/source_datasets.json similarity index 100% rename from data/source_datasets.json rename to reference/source_datasets.json diff --git a/src/config.py b/src/config.py index c8a38ed..5d29c52 100644 --- a/src/config.py +++ b/src/config.py @@ -9,6 +9,7 @@ import httpx from dotenv import find_dotenv, load_dotenv from ijson import sendable_list +from prefect.variables import Variable dotenv_path = find_dotenv() if dotenv_path == "": @@ -33,7 +34,7 @@ def make_path_from_env(env: str, alternative_path: Path) -> Path: # Nombre maximal de workers utilisables par Prefect. Défaut : 16 -MAX_PREFECT_WORKERS = int(os.getenv("MAX_PREFECT_WORKERS", 16)) +MAX_PREFECT_WORKERS = int(os.getenv("MAX_PREFECT_WORKERS", 4)) print(f"{'MAX_PREFECT_WORKERS':<40}", MAX_PREFECT_WORKERS) # Durée avant l'expiration du cache des ressources (en heure). Défaut : 168 (7 jours) @@ -67,7 +68,9 @@ def make_path_from_env(env: str, alternative_path: Path) -> Path: DATAGOUVFR_API = "https://www.data.gouv.fr/api/1" # Clé d'API data.gouv.fr -DATAGOUVFR_API_KEY = os.getenv("DATAGOUVFR_API_KEY", "") +DATAGOUVFR_API_KEY = ( + Variable.get("datagouvfr_api_key") or os.getenv("DATAGOUVFR_API_KEY") or None +) # URL API Prefect PREFECT_API_URL = os.getenv("PREFECT_API_URL") @@ -81,7 +84,9 @@ def make_path_from_env(env: str, alternative_path: Path) -> Path: DATA_DIR.mkdir(exist_ok=True, parents=True) print(f"{'DATA_DIR':<40}", DATA_DIR) -RESOURCE_CACHE_DIR = DATA_DIR / "resource_cache" +RESOURCE_CACHE_DIR = make_path_from_env( + "RESOURCE_CACHE_DIR", DATA_DIR / "resource_cache" +) RESOURCE_CACHE_DIR.mkdir(exist_ok=True, parents=True) DIST_DIR = make_path_from_env("DECP_DIST_DIR", BASE_DIR / "dist") @@ -128,21 +133,15 @@ def make_sirene_data_dir(sirene_data_parent_dir) -> Path: # Lecture ou non des ressource en cache DECP_USE_CACHE = os.getenv("DECP_USE_CACHE", "false").lower() == "true" -# Dossier de stockage des résultats de tâches et du cache -# https://docs.prefect.io/v3/advanced/results#default-persistence-configuration -PREFECT_LOCAL_STORAGE_PATH = make_path_from_env( - "PREFECT_LOCAL_STORAGE_PATH", DATA_DIR / "prefect_storage" -) -print(f"{'PREFECT_LOCAL_STORAGE_PATH':<40}", PREFECT_LOCAL_STORAGE_PATH) - -PREFECT_LOCAL_STORAGE_PATH.mkdir(exist_ok=True, parents=True) - # POSTGRESQL POSTGRESQL_DB_URI = os.getenv("POSTGRESQL_DB_URI") +# Données de référence +REFERENCE_DIR = BASE_DIR / "reference" + # Liste et ordre des colonnes pour le mono dataframe de base (avant normalisation et spécialisation) # Sert aussi à vérifier qu'au moins ces colonnes sont présentes (d'autres peuvent être présentes en plus, les colonnes "innatendues") -schema_fields = json.load(open(DATA_DIR / "schema_base.json", "r"))["fields"] +schema_fields = json.load(open(REFERENCE_DIR / "schema_base.json", "r"))["fields"] BASE_DF_COLUMNS = [field["name"] for field in schema_fields] COLUMNS_TO_DROP = [ @@ -196,7 +195,7 @@ def make_sirene_data_dir(sirene_data_parent_dir) -> Path: with open( make_path_from_env( - "DATASETS_REFERENCE_FILEPATH", DATA_DIR / "source_datasets.json" + "DATASETS_REFERENCE_FILEPATH", REFERENCE_DIR / "source_datasets.json" ), "r", ) as f: @@ -205,9 +204,6 @@ def make_sirene_data_dir(sirene_data_parent_dir) -> Path: if dataset["id"] == SOLO_DATASET: TRACKED_DATASETS = [dataset] -# Ces marchés ont des montants invalides, donc on les met à 1 euro. -MARCHES_BAD_MONTANT = ["221300015002472020F00075"] - @dataclass class DecpFormat: diff --git a/src/deploy.py b/src/deploy.py index 2f32751..5fd3f81 100644 --- a/src/deploy.py +++ b/src/deploy.py @@ -15,13 +15,15 @@ description="Tous les jours du lundi au vendredi à 6h00", work_pool_name="local", ignore_warnings=True, - cron="0 6 * * 1-5", + cron="0 5 * * 1-5", job_variables={ "env": { "DECP_PROCESSING_PUBLISH": "True", "DECP_DIST_DIR": "/srv/shared/decp/prod/dist", - "PREFECT_TASKS_REFRESH_CACHE": "False", - "EXCLUDED_RESOURCES": "501b5201-bc83-4b59-b335-449b34043446,bd33e98f-f8e3-49ba-9f26-51c95fe57234", + # "MAX_PREFECT_WORKERS": "4", + # "EXCLUDED_RESOURCES": None, + "DECP_DATA_DIR": "/home/yunohost.app/prefect", + "DECP_USE_CACHE": "True", } }, ) @@ -41,7 +43,8 @@ "env": { "DECP_PROCESSING_PUBLISH": "False", "DECP_DIST_DIR": "/srv/shared/decp/dev/dist", - "PREFECT_TASKS_REFRESH_CACHE": "True", + "DECP_DATA_DIR": "/home/yunohost.app/prefect", + "DECP_USE_CACHE": "False", } }, ) diff --git a/src/flows/decp_processing.py b/src/flows/decp_processing.py index 56db3dd..b101620 100644 --- a/src/flows/decp_processing.py +++ b/src/flows/decp_processing.py @@ -9,7 +9,6 @@ from prefect.task_runners import ConcurrentTaskRunner from prefect_email import EmailServerCredentials, email_send_message -from config import PREFECT_API_URL from src.config import ( BASE_DF_COLUMNS, BASE_DIR, @@ -17,6 +16,7 @@ DECP_PROCESSING_PUBLISH, DIST_DIR, MAX_PREFECT_WORKERS, + PREFECT_API_URL, RESOURCE_CACHE_DIR, SIRENE_DATA_DIR, TRACKED_DATASETS, diff --git a/src/tasks/output.py b/src/tasks/output.py index 4e22f9d..29b7937 100644 --- a/src/tasks/output.py +++ b/src/tasks/output.py @@ -9,7 +9,7 @@ from polars import selectors as cs from prefect import task -from src.config import DATA_DIR, DIST_DIR, POSTGRESQL_DB_URI +from src.config import DIST_DIR, POSTGRESQL_DB_URI, REFERENCE_DIR def save_to_files(df: pl.DataFrame, path: Path, file_format=None): @@ -152,7 +152,7 @@ def generate_final_schema(lf): ) # récupération de data/data_fields.json - with open(DATA_DIR / "schema_base.json", "r", encoding="utf-8") as file: + with open(REFERENCE_DIR / "schema_base.json", "r", encoding="utf-8") as file: base_json = json.load(file) # fusion des deux From 2cd9524c44f3d55645a3482be14cfe9adc0751b6 Mon Sep 17 00:00:00 2001 From: Colin Maudry Date: Sun, 14 Dec 2025 15:13:41 +0100 Subject: [PATCH 2/6] =?UTF-8?q?M=C3=A0j=20de=20template.env?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- template.env | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/template.env b/template.env index 1efb815..fce1f62 100644 --- a/template.env +++ b/template.env @@ -1,23 +1,20 @@ # Les variable qui ne sont pas explicitement obligatoires sont optionnelles -# Répertoire racine du projet (généralement le répertoire de decp-processing) -# Cette variable est obligatoire, à moins de configurer toutes les autres variables *_DIR -DECP_BASE_DIR= +# Répertoire racine du projet (par défaut : le répertoire de decp-processing) +# "DECP_BASE_DIR= # URI de connexion à POSGRESQL -# obligatoire pour permettre la parallélisation -# ou alors mettre MAX_PREFECT_WORKERS="1" pour désactiver la parallélisation # PREFECT_API_DATABASE_CONNECTION_URL="postgresql+asyncpg://postgres:yourTopSecretPassword@localhost:5432/prefect -# Liste des datasets à traiter par decp-processing -DATASETS_REFERENCE_FILEPATH="data/source_datasets.json" +# Liste des datasets à traiter par decp-processing (défaut : reference/source_datasets.json) +# DATASETS_REFERENCE_FILEPATH= # Le dossier où seront stockées les données téléchargées. Défaut : DECP_BASE_DIR/data # Si vide, laissé commenté # DECP_DATA_DIR= # Chemin de dossier local où enregistrer les données produites en local -# Par défaut DECP_ASE_DIR/dist +# Par défaut DECP_BASE_DIR/dist # Si vide, laissé commenté # DECP_DIST_DIR= @@ -28,7 +25,7 @@ DECP_PROCESSING_PUBLISH=False # La clé API se trouve dans votre compte data.gouv.fr. Utilisée pour publier les données produites. DATAGOUVFR_API_KEY= -# Le dossier où seront stockées les données SIRENE téléchargées (un dossier par mois). Défaut: DATA_DIR +# Le dossier où seront stockées les données SIRENE téléchargées (un dossier par mois). Défaut: DECP_DATA_DIR SIRENE_DATA_PARENT_DIR= # L'URL à laquelle télécharger le parquet des unités légales et établissements (Stock pas historique) @@ -45,9 +42,8 @@ PREFECT_API_AUTH_STRING= # Défaut : $PREFECT_HOME/storage, soit /home/$USER/.prefect/storage # PREFECT_LOCAL_STORAGE_PATH= -# Nombre maximal de workers utilisables par Prefect. Défaut : 16 -# Actuellement le multithreading n'est pas supporté -MAX_PREFECT_WORKERS="1" +# Nombre maximal de workers utilisables par Prefect. Défaut : 4 +MAX_PREFECT_WORKERS= # Durée avant l'expiration du cache des ressources (en heure). Défaut : 168 (7 jours) # CACHE_EXPIRATION_TIME_HOURS="168" @@ -57,7 +53,7 @@ MAX_PREFECT_WORKERS="1" # La base de données doit être créée auparavant POSTGRESQL_DB_URI="postgresql://user:pass@server:port/database" -# Pour ignorer le cache Prefect mettre "false" +# Pour ignorer le cache mettre "false" DECP_USE_CACHE="true" # Timeout pour la publication de chaque ressource sur data.gouv.fr. Défaut : 300 (5 minutes) From 0ee707b1a35c44d4673e9726f72e878f839842e7 Mon Sep 17 00:00:00 2001 From: Colin Maudry Date: Sun, 14 Dec 2025 15:20:55 +0100 Subject: [PATCH 3/6] =?UTF-8?q?M=C3=A0j=20de=20template.env?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- template.env | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/template.env b/template.env index fce1f62..24fb989 100644 --- a/template.env +++ b/template.env @@ -23,10 +23,10 @@ DECP_PROCESSING_PUBLISH=False # La clé API se trouve dans votre compte data.gouv.fr. Utilisée pour publier les données produites. -DATAGOUVFR_API_KEY= +# DATAGOUVFR_API_KEY= # Le dossier où seront stockées les données SIRENE téléchargées (un dossier par mois). Défaut: DECP_DATA_DIR -SIRENE_DATA_PARENT_DIR= +# SIRENE_DATA_PARENT_DIR= # L'URL à laquelle télécharger le parquet des unités légales et établissements (Stock pas historique) SIRENE_UNITES_LEGALES_URL=https://www.data.gouv.fr/api/1/datasets/r/350182c9-148a-46e0-8389-76c2ec1374a3 @@ -43,7 +43,7 @@ PREFECT_API_AUTH_STRING= # PREFECT_LOCAL_STORAGE_PATH= # Nombre maximal de workers utilisables par Prefect. Défaut : 4 -MAX_PREFECT_WORKERS= +# MAX_PREFECT_WORKERS= # Durée avant l'expiration du cache des ressources (en heure). Défaut : 168 (7 jours) # CACHE_EXPIRATION_TIME_HOURS="168" @@ -63,11 +63,11 @@ DECP_USE_CACHE="true" #- all : toutes les années de 2018 à aujourd'hui #- month : juste les données du mois en cours (valeur par défaut) #- year juste les données du mois en cours -SCRAPING_MODE= +# SCRAPING_MODE= # Identifiants de ressources data.gouv.fr exclues du traitement # séparés par des virgules -EXCLUDED_RESOURCES= +# EXCLUDED_RESOURCES= # Ne traiter que ce dataset (par son ID) -SOLO_DATASET= +# SOLO_DATASET= From a75912e8e982e27b215d31961d0118c8815236c5 Mon Sep 17 00:00:00 2001 From: Colin Maudry Date: Sun, 14 Dec 2025 15:22:06 +0100 Subject: [PATCH 4/6] Coquille --- src/tasks/get.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tasks/get.py b/src/tasks/get.py index e32c60d..8cb705f 100644 --- a/src/tasks/get.py +++ b/src/tasks/get.py @@ -14,7 +14,6 @@ from prefect import task from prefect.transactions import transaction -from config import SIRENE_UNITES_LEGALES_URL from src.config import ( DATA_DIR, DECP_PROCESSING_PUBLISH, @@ -22,6 +21,7 @@ HTTP_CLIENT, HTTP_HEADERS, RESOURCE_CACHE_DIR, + SIRENE_UNITES_LEGALES_URL, DecpFormat, ) from src.schemas import SCHEMA_MARCHE_2019, SCHEMA_MARCHE_2022 From 4cee7608afcd1bbd4d028dbaf5c8eea35f4b9995 Mon Sep 17 00:00:00 2001 From: Colin Maudry Date: Sun, 14 Dec 2025 15:24:34 +0100 Subject: [PATCH 5/6] Fixed imports --- src/tasks/get.py | 2 +- tests/test_main.py | 2 +- tests/test_transform.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/tasks/get.py b/src/tasks/get.py index 8cb705f..8530cc3 100644 --- a/src/tasks/get.py +++ b/src/tasks/get.py @@ -31,12 +31,12 @@ extract_innermost_struct, ) from src.tasks.output import sink_to_files +from src.tasks.transform import prepare_unites_legales from src.tasks.utils import ( full_resource_name, gen_artifact_row, stream_replace_bytestring, ) -from tasks.transform import prepare_unites_legales @task(retries=3, retry_delay_seconds=3) diff --git a/tests/test_main.py b/tests/test_main.py index 991b0a1..81cc81d 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -1,4 +1,4 @@ -from flows.decp_processing import decp_processing +from src.flows.decp_processing import decp_processing from tests.fixtures import prefect_test_harness diff --git a/tests/test_transform.py b/tests/test_transform.py index e0fffe6..e7f91f6 100644 --- a/tests/test_transform.py +++ b/tests/test_transform.py @@ -1,7 +1,7 @@ import polars as pl from polars.testing import assert_frame_equal -from tasks.transform import ( +from src.tasks.transform import ( prepare_etablissements, prepare_unites_legales, replace_with_modification_data, From 48c1ae38b40e199ffda460f19ab3abd5a9ac7d22 Mon Sep 17 00:00:00 2001 From: Colin Maudry Date: Sun, 14 Dec 2025 16:17:32 +0100 Subject: [PATCH 6/6] Changelog --- CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7192a6c..322bf53 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +### 2.6.1 2025-12-14 + +- Séparation des fichiers de référence et des fichiers de données +- Réorganisation des variables d'environnement +- Correction de certains imports de modules + ### 2.6.0 2025-12-12 - Abandon des données consolidées par le MINEF, récupération des données à la source ([#151](https://github.com/ColinMaudry/decp-processing/issues/151))