Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
2f7de9b
Gestion des acheteurs absents de SIRENE et nettoyage des SIRET titula…
imanuch Dec 15, 2025
c61d505
Timeout pour la tâche process_batch à 30min
ColinMaudry Dec 19, 2025
01f9d10
Merge branch 'hotfix/2.6.4' into dev
ColinMaudry Dec 23, 2025
b0bfba2
Merge branch 'main' into feat/acheteurs_null
ColinMaudry Dec 24, 2025
b909c4c
Merge pull request #160 from imanuch/feat/acheteurs_null
ColinMaudry Dec 24, 2025
58cc1f9
Changelog 2.6.4
ColinMaudry Dec 23, 2025
a6be682
Pas de publication si SOLO_DATASET
ColinMaudry Dec 24, 2025
c60dfdd
Correction des args logging #94
ColinMaudry Dec 24, 2025
63827e3
Merci @imanuch pour PR #160 !
ColinMaudry Dec 24, 2025
1bda5e3
Rempacement des guillemets simples par des apostrophes dans objet
ColinMaudry Dec 24, 2025
b065fdd
Gestion des variables d'entrée du scrap, réduction des temps de laten…
ColinMaudry Dec 27, 2025
11f03fb
Typo
ColinMaudry Dec 27, 2025
d721e9b
Plus de contrôle sur le scrap via env
ColinMaudry Dec 27, 2025
806bf1b
Scrap des données AIFE #144
ColinMaudry Dec 27, 2025
fe7ff68
Source et deployment #144
ColinMaudry Dec 27, 2025
69b1c30
Meilleure gestion titulaires DUME #144
ColinMaudry Dec 27, 2025
c2df72c
Ajout du dataset scrap dume #144
ColinMaudry Dec 27, 2025
d5e7d72
Changelog
ColinMaudry Dec 29, 2025
eaec3e9
Merge branch 'feature/dume_download' into dev
ColinMaudry Dec 29, 2025
e4c1f83
Refacto fonctions scrap dans des fichiers dédiés
ColinMaudry Dec 29, 2025
63ebfe6
Ajout du scrap klekoon #71
ColinMaudry Dec 29, 2025
aa01173
Ajout du scrap klekoon #71
ColinMaudry Dec 29, 2025
5aa670a
Fix log level, ajout du log level à ALL_CONFIG
ColinMaudry Dec 29, 2025
4c57278
Ajout du dataset scrap klekoon #71
ColinMaudry Dec 29, 2025
7b9d767
Merge branch 'feature/71_klekoon' into dev
ColinMaudry Dec 29, 2025
812b99d
Changelog #71
ColinMaudry Dec 29, 2025
706ffcd
Bump version
ColinMaudry Dec 29, 2025
94e4dad
Bump polars version
ColinMaudry Dec 29, 2025
550065a
Màj tests (objet)
ColinMaudry Dec 29, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
### 2.7.0

- Remplacement des guillemets simples par des apostrophes dans "objet"
- Ajout des données de l'API DUME (code source `scrap_aife_dume`) ([#144](https://github.com/ColinMaudry/decp-processing/issues/144))
- Ajout des données du profil d'acheteur Klekoon (code source `scrap_klekoon`) ([#71](https://github.com/ColinMaudry/decp-processing/issues/71))

#### 2.6.4 2025-12-19

- Tri et numérotation des modifications après la concaténation plutôt que par ressource, pour réduire le nombre de doublons ([#156](https://github.com/ColinMaudry/decp-processing/issues/156))
- Utilisation du logger de prefect plûtot que `log_prints=True`
- Utilisation du logger de prefect plûtot que `log_prints=True` ([#94](https://github.com/ColinMaudry/decp-processing/issues/94))

#### 2.6.3 2025-12-16

Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# DECP processing

> version 2.6.3 ([notes de version](https://github.com/ColinMaudry/decp-processing/blob/main/CHANGELOG.md))
> version 2.7.0 ([notes de version](https://github.com/ColinMaudry/decp-processing/blob/main/CHANGELOG.md))

Projet de traitement et de publication de meilleures données sur les marchés publics attribués en France. Vous pouvez consulter, filtrer et télécharger
ces données sur le site [decp.info](https://decp.info). Enfin la section [À propos](https://decp.info/a-propos) décrit les objectifs du projet et regroupe toutes les informations clés.
Expand Down Expand Up @@ -163,3 +163,4 @@ pytest tests/test_main.py
- [FranckMaseo](https://github.com/frankmaseo)
- [Thomas Louf](https://github.com/tlouf)
- [vico4445](https://github.com/vico4445)
- [imanuch](https://github.com/imanuch)
7 changes: 4 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
[project]
name = "decp-processing"
description = "Traitement des données des marchés publics français."
version = "2.6.3"
version = "2.7.0"
requires-python = ">= 3.9"
authors = [
{ name = "Colin Maudry", email = "colin+decp@maudry.com" }
]
dependencies = [
"python-dotenv",
"pandas", # nécessaire pour l'écriture en base de données
"polars==1.35.2",
"polars==1.36.1",
"pyarrow",
"frictionless",
"ipykernel",
Expand All @@ -23,7 +23,8 @@ dependencies = [
"selenium",
"polars_ds",
"scikit-learn",
"tenacity"
"tenacity",
"dume_api"
]

[project.optional-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion reference/schema_base.json
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
"type": "string",
"name": "procedure",
"title": "Procédure",
"description": "Le type de procédure utilisé pour le marché public. Prodcédure négociée ouverte, Procédure non négociée ouverte, Procédure négociée restreinte, Procédure non négociée restreinte.",
"description": "Le type de procédure utilisé pour le marché public. Procédure négociée ouverte, Procédure non négociée ouverte, Procédure négociée restreinte, Procédure non négociée restreinte.",
"short_title": null
},
{
Expand Down
12 changes: 12 additions & 0 deletions reference/source_datasets.json
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,12 @@
"owner_org_name": "Agence pour l'Informatique Financière de l'Etat",
"code": "aife_dume"
},
{
"id": "694ff7a98210456475f98aca",
"name": "Données essentielles de la commande publique (DECP) de l'API DUME (AIFE)",
"owner_org_name": "Colin Maudry",
"code": "scrap_aife_dume"
},
{
"id": "68ebb48dd708fdb2d7c15bff",
"name": "Données des marchés publics de marches-securises.fr",
Expand Down Expand Up @@ -310,5 +316,11 @@
"name": "Les Personnes placées sous main de justice - IDF 2024",
"code": "ppsmj",
"owner_org_name": "Yael Siksik"
},
{
"id": "6952899077f982c9a2373ede",
"name": "Données essentielles de la commande publique (DECP) de Klekoon",
"code": "scrap_klekoon",
"owner_org_name": "Colin Maudry"
}
]
8 changes: 7 additions & 1 deletion run_flow.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import sys

from src.config import SCRAPING_MODE, SCRAPING_MONTH, SCRAPING_TARGET, SCRAPING_YEAR
from src.flows.decp_processing import decp_processing
from src.flows.get_cog import get_cog
from src.flows.scrap import scrap
Expand Down Expand Up @@ -28,4 +29,9 @@
if func_name != "scrap":
FUNCTIONS[func_name]()
else:
scrap(target="aws", mode="all")
scrap(
mode=SCRAPING_MODE,
target=SCRAPING_TARGET,
month=SCRAPING_MONTH,
year=SCRAPING_YEAR,
)
16 changes: 16 additions & 0 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def make_path_from_env(env: str, alternative_path: Path) -> Path:

# Niveau des logs
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")
ALL_CONFIG["LOG_LEVEL"] = LOG_LEVEL

# Nombre maximal de workers utilisables par Prefect. Défaut : 16
MAX_PREFECT_WORKERS = int(os.getenv("MAX_PREFECT_WORKERS", 4))
Expand Down Expand Up @@ -132,6 +133,14 @@ def make_sirene_data_dir(sirene_data_parent_dir) -> Path:
SCRAPING_TARGET = os.getenv("SCRAPING_TARGET")
ALL_CONFIG["SCRAPING_TARGET"] = SCRAPING_TARGET

# Year (année cible pour le scraping)
SCRAPING_YEAR = os.getenv("SCRAPING_YEAR")
ALL_CONFIG["SCRAPING_YEAR"] = SCRAPING_YEAR

# Month (mois cible pour le scraping)
SCRAPING_MONTH = os.getenv("SCRAPING_MONTH")
ALL_CONFIG["SCRAPING_MONTH"] = SCRAPING_MONTH

# Lecture ou non des ressource en cache
DECP_USE_CACHE = os.getenv("DECP_USE_CACHE", "false").lower() == "true"

Expand Down Expand Up @@ -196,6 +205,13 @@ def make_sirene_data_dir(sirene_data_parent_dir) -> Path:
SOLO_DATASET = os.getenv("SOLO_DATASET", "")
ALL_CONFIG["SOLO_DATASET"] = SOLO_DATASET

# Acheteurs absents de la base SIRENE (pour raisons de sécurité ou autre)
# Format: SIRET -> {"nom": "...", ...}
# Ces données sont utilisées en fallback si l'acheteur n'est pas trouvé dans SIRENE
ACHETEURS_NON_SIRENE = {
"13001536500013": {"nom": "Ministère des Armées"},
}

with open(
make_path_from_env(
"DATASETS_REFERENCE_FILEPATH", REFERENCE_DIR / "source_datasets.json"
Expand Down
22 changes: 22 additions & 0 deletions src/deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,28 @@
},
)

flow.from_source(
source=GitRepository(
url="https://github.com/ColinMaudry/decp-processing.git", branch="main"
),
entrypoint="src/flows/scrap.py:scrap",
).deploy(
name="scrap-dume",
description="Scraping des données de l'API DUME.",
ignore_warnings=True,
work_pool_name="local",
cron="0 0 * * 2",
job_variables={
"env": {
"DECP_PROCESSING_PUBLISH": "True",
"DECP_DIST_DIR": "/srv/shared/decp/prod/dist",
"PREFECT_TASKS_REFRESH_CACHE": "False",
"SCRAPING_MODE": "month",
"SCRAPING_TARGET": "dume",
}
},
)

flow.from_source(
source=GitRepository(
url="https://github.com/ColinMaudry/decp-processing.git", branch="main"
Expand Down
12 changes: 8 additions & 4 deletions src/flows/decp_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from prefect_email import EmailServerCredentials, email_send_message

from src.config import (
ALL_CONFIG,
BASE_DF_COLUMNS,
DATE_NOW,
DECP_PROCESSING_PUBLISH,
Expand All @@ -20,6 +19,7 @@
PREFECT_API_URL,
RESOURCE_CACHE_DIR,
SIRENE_DATA_DIR,
SOLO_DATASET,
TRACKED_DATASETS,
)
from src.flows.sirene_preprocess import sirene_preprocess
Expand Down Expand Up @@ -50,7 +50,7 @@ def decp_processing(enable_cache_removal: bool = True):

logger.info("🚀 Début du flow decp-processing")

print_all_config(ALL_CONFIG)
print_all_config()

logger.info("Liste de toutes les ressources des datasets...")
resources: list[dict] = list_resources(TRACKED_DATASETS)
Expand Down Expand Up @@ -79,7 +79,11 @@ def decp_processing(enable_cache_removal: bool = True):
)

# Afin d'être sûr que je ne publie pas par erreur un jeu de données de test
decp_publish = DECP_PROCESSING_PUBLISH and len(resources_to_process) > 5000
decp_publish = (
DECP_PROCESSING_PUBLISH
and len(resources_to_process) > 5000
and SOLO_DATASET in ["", None]
)

if decp_publish:
create_table_artifact(
Expand Down Expand Up @@ -146,7 +150,7 @@ def decp_processing(enable_cache_removal: bool = True):
logger.info("☑️ Fin du flow principal decp_processing.")


@task(retries=2)
@task(retries=2, timeout_seconds=1800)
def process_batch(
available_parquet_files,
batch_size,
Expand Down
39 changes: 28 additions & 11 deletions src/flows/scrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,39 @@
SCRAPING_MODE,
SCRAPING_TARGET,
)
from src.tasks.scrap import scrap_aws_month, scrap_marches_securises_month
from src.tasks.utils import get_logger
from src.tasks.utils import get_logger, print_all_config
from tasks.scrap.aws import scrap_aws_month
from tasks.scrap.dume import scrap_dume_month
from tasks.scrap.klekoon import scrap_klekoon
from tasks.scrap.marches_securises import scrap_marches_securises_month


@flow(log_prints=True)
def scrap(target: str = None, mode: str = None, month=None, year=None):
def scrap(target: str, mode: str, month=None, year=None):
logger = get_logger(level=LOG_LEVEL)

print_all_config()

# Remise à zéro du dossier dist
dist_dir: Path = DIST_DIR / target
if dist_dir.exists():
logger.debug(f"Suppression de {dist_dir}...")
logger.info(f"Suppression de {dist_dir}...")
rmtree(dist_dir)
else:
dist_dir.mkdir(parents=True)

# Sélection du target
target = target or SCRAPING_TARGET
dist_dir.mkdir(parents=True, exist_ok=True)

# Sélection de la fonction de scraping en fonction de target
if target == "aws":
scrap_target_month = scrap_aws_month
elif target == "marches-securises.fr":
scrap_target_month = scrap_marches_securises_month
elif target == "dume":
scrap_target_month = scrap_dume_month
elif target == "klekoon":
# Klekoon présent ses données par acheteur et non de manière temporelle
# donc on télécharge tout à chaque fois
scrap_klekoon(dist_dir)
return
else:
logger.error("Quel target ?")
raise ValueError
Expand All @@ -43,8 +53,15 @@ def scrap(target: str = None, mode: str = None, month=None, year=None):
month = month or current_month
year = year or current_year

# Sélection du mode
mode = mode or SCRAPING_MODE
# Récapitulatif de la config
# mode et target doivent être passés en paramètre
# les éventuelles env sont injectées via /run_flow.py
# en prod les paramètres sont spécifiées dans le deployment Prefect
logger.info(f"""
Target: {target} (env {SCRAPING_TARGET})
Mode: {mode} (env {SCRAPING_MODE})
Year: {year}
Month: {month}""")

# Sélection de la plage temporelle
if mode == "month":
Expand All @@ -57,7 +74,7 @@ def scrap(target: str = None, mode: str = None, month=None, year=None):

elif mode == "all":
current_year = int(current_year)
for year in reversed(range(2018, current_year + 2)):
for year in reversed(range(2018, current_year + 1)):
scrap(target=target, mode="year", year=str(year))

else:
Expand Down
10 changes: 9 additions & 1 deletion src/tasks/clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ def clean_decp(lf: pl.LazyFrame, decp_format: DecpFormat) -> pl.LazyFrame:
.name.keep()
)

# Nettoyage des espaces dans titulaire_id (ex: " 33487372600239")
lf = lf.with_columns(pl.col("titulaire_id").str.strip_chars())

# Type identifiant = SIRET si vide (marches-securises.fr)
lf = lf.with_columns(
pl.when(
Expand All @@ -138,7 +141,12 @@ def clean_decp(lf: pl.LazyFrame, decp_format: DecpFormat) -> pl.LazyFrame:
)

# NC
lf = lf.with_columns(pl.col(pl.Utf8).replace("NC", None))
lf = lf.with_columns(pl.col(pl.Utf8).replace("NC", None).name.keep())

# Remplacement des single quotes qui servent d'apostrophes
lf = lf.with_columns(
pl.col("objet").str.replace_all(r"(\w)'(\w)", "$1’$2").alias("objet")
)

# Correction des datatypes
lf = fix_data_types(lf)
Expand Down
31 changes: 30 additions & 1 deletion src/tasks/enrich.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import polars as pl
import polars.selectors as cs

from src.config import LOG_LEVEL, SIRENE_DATA_DIR
from src.config import LOG_LEVEL, SIRENE_DATA_DIR, ACHETEURS_NON_SIRENE
from src.tasks.transform import (
extract_unique_acheteurs_siret,
extract_unique_titulaires_siret,
Expand Down Expand Up @@ -139,6 +139,9 @@ def enrich_from_sirene(lf: pl.LazyFrame):

lf = lf.join(lf_sirets_acheteurs, how="left", on="acheteur_id")

# Fallback pour les acheteurs absents de SIRENE (ex: Ministère des Armées)
lf = apply_acheteurs_non_sirene_fallback(lf)

# En joignant en utilisant à la fois le SIRET et le typeIdentifiant, on s'assure qu'on ne joint pas sur
# des id de titulaires non-SIRET
lf = lf.join(
Expand All @@ -158,6 +161,32 @@ def enrich_from_sirene(lf: pl.LazyFrame):
return lf


def apply_acheteurs_non_sirene_fallback(lf: pl.LazyFrame) -> pl.LazyFrame:
"""Applique les données d'acheteurs non présents dans SIRENE en fallback.

Les acheteurs absents de SIRENE ET du fallback conservent acheteur_nom = NULL.
"""
if not ACHETEURS_NON_SIRENE:
return lf

# Créer un DataFrame de fallback à partir du dictionnaire
fallback_data = [
{"acheteur_id": siret, "acheteur_nom_fallback": data["nom"]}
for siret, data in ACHETEURS_NON_SIRENE.items()
]
lf_fallback = pl.LazyFrame(fallback_data)

# Joindre avec les données de fallback
lf = lf.join(lf_fallback, on="acheteur_id", how="left")

# Remplacer acheteur_nom NULL par la valeur de fallback (si disponible)
lf = lf.with_columns(
pl.coalesce("acheteur_nom", "acheteur_nom_fallback").alias("acheteur_nom")
).drop("acheteur_nom_fallback")

return lf


def calculate_distance(lf: pl.LazyFrame) -> pl.LazyFrame:
# Utilisation de polars_ds.haversine
# https://polars-ds-extension.readthedocs.io/en/latest/num.html#polars_ds.exprs.num.haversine
Expand Down
2 changes: 2 additions & 0 deletions src/tasks/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ def publish_scrap_to_datagouv(year: str, month: str, file_path, target):
dataset_ids = {
"aws": "68caf6b135f19236a4f37a32",
"marches-securises.fr": "68ebb48dd708fdb2d7c15bff",
"dume": "694ff7a98210456475f98aca",
"klekoon": "6952899077f982c9a2373ede",
}
logger = get_logger(level=LOG_LEVEL)

Expand Down
Empty file added src/tasks/scrap/__init__.py
Empty file.
Loading