Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
### 2.6.1 2025-12-14

- Séparation des fichiers de référence et des fichiers de données
- Réorganisation des variables d'environnement
- Correction de certains imports de modules

### 2.6.0 2025-12-12

- Abandon des données consolidées par le MINEF, récupération des données à la source ([#151](https://github.com/ColinMaudry/decp-processing/issues/151))
Expand Down
File renamed without changes.
File renamed without changes.
30 changes: 13 additions & 17 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import httpx
from dotenv import find_dotenv, load_dotenv
from ijson import sendable_list
from prefect.variables import Variable

dotenv_path = find_dotenv()
if dotenv_path == "":
Expand All @@ -33,7 +34,7 @@ def make_path_from_env(env: str, alternative_path: Path) -> Path:


# Nombre maximal de workers utilisables par Prefect. Défaut : 16
MAX_PREFECT_WORKERS = int(os.getenv("MAX_PREFECT_WORKERS", 16))
MAX_PREFECT_WORKERS = int(os.getenv("MAX_PREFECT_WORKERS", 4))
print(f"{'MAX_PREFECT_WORKERS':<40}", MAX_PREFECT_WORKERS)

# Durée avant l'expiration du cache des ressources (en heure). Défaut : 168 (7 jours)
Expand Down Expand Up @@ -67,7 +68,9 @@ def make_path_from_env(env: str, alternative_path: Path) -> Path:
DATAGOUVFR_API = "https://www.data.gouv.fr/api/1"

# Clé d'API data.gouv.fr
DATAGOUVFR_API_KEY = os.getenv("DATAGOUVFR_API_KEY", "")
DATAGOUVFR_API_KEY = (
Variable.get("datagouvfr_api_key") or os.getenv("DATAGOUVFR_API_KEY") or None
)

# URL API Prefect
PREFECT_API_URL = os.getenv("PREFECT_API_URL")
Expand All @@ -81,7 +84,9 @@ def make_path_from_env(env: str, alternative_path: Path) -> Path:
DATA_DIR.mkdir(exist_ok=True, parents=True)
print(f"{'DATA_DIR':<40}", DATA_DIR)

RESOURCE_CACHE_DIR = DATA_DIR / "resource_cache"
RESOURCE_CACHE_DIR = make_path_from_env(
"RESOURCE_CACHE_DIR", DATA_DIR / "resource_cache"
)
RESOURCE_CACHE_DIR.mkdir(exist_ok=True, parents=True)

DIST_DIR = make_path_from_env("DECP_DIST_DIR", BASE_DIR / "dist")
Expand Down Expand Up @@ -128,21 +133,15 @@ def make_sirene_data_dir(sirene_data_parent_dir) -> Path:
# Lecture ou non des ressource en cache
DECP_USE_CACHE = os.getenv("DECP_USE_CACHE", "false").lower() == "true"

# Dossier de stockage des résultats de tâches et du cache
# https://docs.prefect.io/v3/advanced/results#default-persistence-configuration
PREFECT_LOCAL_STORAGE_PATH = make_path_from_env(
"PREFECT_LOCAL_STORAGE_PATH", DATA_DIR / "prefect_storage"
)
print(f"{'PREFECT_LOCAL_STORAGE_PATH':<40}", PREFECT_LOCAL_STORAGE_PATH)

PREFECT_LOCAL_STORAGE_PATH.mkdir(exist_ok=True, parents=True)

# POSTGRESQL
POSTGRESQL_DB_URI = os.getenv("POSTGRESQL_DB_URI")

# Données de référence
REFERENCE_DIR = BASE_DIR / "reference"

# Liste et ordre des colonnes pour le mono dataframe de base (avant normalisation et spécialisation)
# Sert aussi à vérifier qu'au moins ces colonnes sont présentes (d'autres peuvent être présentes en plus, les colonnes "innatendues")
schema_fields = json.load(open(DATA_DIR / "schema_base.json", "r"))["fields"]
schema_fields = json.load(open(REFERENCE_DIR / "schema_base.json", "r"))["fields"]
BASE_DF_COLUMNS = [field["name"] for field in schema_fields]

COLUMNS_TO_DROP = [
Expand Down Expand Up @@ -196,7 +195,7 @@ def make_sirene_data_dir(sirene_data_parent_dir) -> Path:

with open(
make_path_from_env(
"DATASETS_REFERENCE_FILEPATH", DATA_DIR / "source_datasets.json"
"DATASETS_REFERENCE_FILEPATH", REFERENCE_DIR / "source_datasets.json"
),
"r",
) as f:
Expand All @@ -205,9 +204,6 @@ def make_sirene_data_dir(sirene_data_parent_dir) -> Path:
if dataset["id"] == SOLO_DATASET:
TRACKED_DATASETS = [dataset]

# Ces marchés ont des montants invalides, donc on les met à 1 euro.
MARCHES_BAD_MONTANT = ["221300015002472020F00075"]


@dataclass
class DecpFormat:
Expand Down
11 changes: 7 additions & 4 deletions src/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
description="Tous les jours du lundi au vendredi à 6h00",
work_pool_name="local",
ignore_warnings=True,
cron="0 6 * * 1-5",
cron="0 5 * * 1-5",
job_variables={
"env": {
"DECP_PROCESSING_PUBLISH": "True",
"DECP_DIST_DIR": "/srv/shared/decp/prod/dist",
"PREFECT_TASKS_REFRESH_CACHE": "False",
"EXCLUDED_RESOURCES": "501b5201-bc83-4b59-b335-449b34043446,bd33e98f-f8e3-49ba-9f26-51c95fe57234",
# "MAX_PREFECT_WORKERS": "4",
# "EXCLUDED_RESOURCES": None,
"DECP_DATA_DIR": "/home/yunohost.app/prefect",
"DECP_USE_CACHE": "True",
}
},
)
Expand All @@ -41,7 +43,8 @@
"env": {
"DECP_PROCESSING_PUBLISH": "False",
"DECP_DIST_DIR": "/srv/shared/decp/dev/dist",
"PREFECT_TASKS_REFRESH_CACHE": "True",
"DECP_DATA_DIR": "/home/yunohost.app/prefect",
"DECP_USE_CACHE": "False",
}
},
)
Expand Down
2 changes: 1 addition & 1 deletion src/flows/decp_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
from prefect.task_runners import ConcurrentTaskRunner
from prefect_email import EmailServerCredentials, email_send_message

from config import PREFECT_API_URL
from src.config import (
BASE_DF_COLUMNS,
BASE_DIR,
DATE_NOW,
DECP_PROCESSING_PUBLISH,
DIST_DIR,
MAX_PREFECT_WORKERS,
PREFECT_API_URL,
RESOURCE_CACHE_DIR,
SIRENE_DATA_DIR,
TRACKED_DATASETS,
Expand Down
4 changes: 2 additions & 2 deletions src/tasks/get.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
from prefect import task
from prefect.transactions import transaction

from config import SIRENE_UNITES_LEGALES_URL
from src.config import (
DATA_DIR,
DECP_PROCESSING_PUBLISH,
DECP_USE_CACHE,
HTTP_CLIENT,
HTTP_HEADERS,
RESOURCE_CACHE_DIR,
SIRENE_UNITES_LEGALES_URL,
DecpFormat,
)
from src.schemas import SCHEMA_MARCHE_2019, SCHEMA_MARCHE_2022
Expand All @@ -31,12 +31,12 @@
extract_innermost_struct,
)
from src.tasks.output import sink_to_files
from src.tasks.transform import prepare_unites_legales
from src.tasks.utils import (
full_resource_name,
gen_artifact_row,
stream_replace_bytestring,
)
from tasks.transform import prepare_unites_legales


@task(retries=3, retry_delay_seconds=3)
Expand Down
4 changes: 2 additions & 2 deletions src/tasks/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from polars import selectors as cs
from prefect import task

from src.config import DATA_DIR, DIST_DIR, POSTGRESQL_DB_URI
from src.config import DIST_DIR, POSTGRESQL_DB_URI, REFERENCE_DIR


def save_to_files(df: pl.DataFrame, path: Path, file_format=None):
Expand Down Expand Up @@ -152,7 +152,7 @@ def generate_final_schema(lf):
)

# récupération de data/data_fields.json
with open(DATA_DIR / "schema_base.json", "r", encoding="utf-8") as file:
with open(REFERENCE_DIR / "schema_base.json", "r", encoding="utf-8") as file:
base_json = json.load(file)

# fusion des deux
Expand Down
32 changes: 14 additions & 18 deletions template.env
Original file line number Diff line number Diff line change
@@ -1,23 +1,20 @@
# Les variable qui ne sont pas explicitement obligatoires sont optionnelles

# Répertoire racine du projet (généralement le répertoire de decp-processing)
# Cette variable est obligatoire, à moins de configurer toutes les autres variables *_DIR
DECP_BASE_DIR=
# Répertoire racine du projet (par défaut : le répertoire de decp-processing)
# "DECP_BASE_DIR=

# URI de connexion à POSGRESQL
# obligatoire pour permettre la parallélisation
# ou alors mettre MAX_PREFECT_WORKERS="1" pour désactiver la parallélisation
# PREFECT_API_DATABASE_CONNECTION_URL="postgresql+asyncpg://postgres:yourTopSecretPassword@localhost:5432/prefect

# Liste des datasets à traiter par decp-processing
DATASETS_REFERENCE_FILEPATH="data/source_datasets.json"
# Liste des datasets à traiter par decp-processing (défaut : reference/source_datasets.json)
# DATASETS_REFERENCE_FILEPATH=

# Le dossier où seront stockées les données téléchargées. Défaut : DECP_BASE_DIR/data
# Si vide, laissé commenté
# DECP_DATA_DIR=

# Chemin de dossier local où enregistrer les données produites en local
# Par défaut DECP_ASE_DIR/dist
# Par défaut DECP_BASE_DIR/dist
# Si vide, laissé commenté
# DECP_DIST_DIR=

Expand All @@ -26,10 +23,10 @@ DATASETS_REFERENCE_FILEPATH="data/source_datasets.json"
DECP_PROCESSING_PUBLISH=False

# La clé API se trouve dans votre compte data.gouv.fr. Utilisée pour publier les données produites.
DATAGOUVFR_API_KEY=
# DATAGOUVFR_API_KEY=

# Le dossier où seront stockées les données SIRENE téléchargées (un dossier par mois). Défaut: DATA_DIR
SIRENE_DATA_PARENT_DIR=
# Le dossier où seront stockées les données SIRENE téléchargées (un dossier par mois). Défaut: DECP_DATA_DIR
# SIRENE_DATA_PARENT_DIR=

# L'URL à laquelle télécharger le parquet des unités légales et établissements (Stock pas historique)
SIRENE_UNITES_LEGALES_URL=https://www.data.gouv.fr/api/1/datasets/r/350182c9-148a-46e0-8389-76c2ec1374a3
Expand All @@ -45,9 +42,8 @@ PREFECT_API_AUTH_STRING=
# Défaut : $PREFECT_HOME/storage, soit /home/$USER/.prefect/storage
# PREFECT_LOCAL_STORAGE_PATH=

# Nombre maximal de workers utilisables par Prefect. Défaut : 16
# Actuellement le multithreading n'est pas supporté
MAX_PREFECT_WORKERS="1"
# Nombre maximal de workers utilisables par Prefect. Défaut : 4
# MAX_PREFECT_WORKERS=

# Durée avant l'expiration du cache des ressources (en heure). Défaut : 168 (7 jours)
# CACHE_EXPIRATION_TIME_HOURS="168"
Expand All @@ -57,7 +53,7 @@ MAX_PREFECT_WORKERS="1"
# La base de données doit être créée auparavant
POSTGRESQL_DB_URI="postgresql://user:pass@server:port/database"

# Pour ignorer le cache Prefect mettre "false"
# Pour ignorer le cache mettre "false"
DECP_USE_CACHE="true"

# Timeout pour la publication de chaque ressource sur data.gouv.fr. Défaut : 300 (5 minutes)
Expand All @@ -67,11 +63,11 @@ DECP_USE_CACHE="true"
#- all : toutes les années de 2018 à aujourd'hui
#- month : juste les données du mois en cours (valeur par défaut)
#- year juste les données du mois en cours
SCRAPING_MODE=
# SCRAPING_MODE=

# Identifiants de ressources data.gouv.fr exclues du traitement
# séparés par des virgules
EXCLUDED_RESOURCES=
# EXCLUDED_RESOURCES=

# Ne traiter que ce dataset (par son ID)
SOLO_DATASET=
# SOLO_DATASET=
2 changes: 1 addition & 1 deletion tests/test_main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from flows.decp_processing import decp_processing
from src.flows.decp_processing import decp_processing
from tests.fixtures import prefect_test_harness


Expand Down
2 changes: 1 addition & 1 deletion tests/test_transform.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import polars as pl
from polars.testing import assert_frame_equal

from tasks.transform import (
from src.tasks.transform import (
prepare_etablissements,
prepare_unites_legales,
replace_with_modification_data,
Expand Down