From 2f7de9b476bb81a38862366dce1223312ebb2ed1 Mon Sep 17 00:00:00 2001 From: imanuch Date: Mon, 15 Dec 2025 16:46:14 +0100 Subject: [PATCH 01/24] Gestion des acheteurs absents de SIRENE et nettoyage des SIRET titulaires MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Ajout d'un dictionnaire ACHETEURS_NON_SIRENE pour les acheteurs non répertoriés dans SIRENE (ex: Ministère des Armées) pour raisons de sécurité - Fallback appliqué après la jointure SIRENE pour remplir acheteur_nom - Strip des espaces dans titulaire_id pour corriger les SIRET malformés (ex: " 33487372600239") Co-Authored-By: Claude Opus 4.5 --- src/config.py | 7 +++++++ src/tasks/clean.py | 3 +++ src/tasks/enrich.py | 31 ++++++++++++++++++++++++++++++- 3 files changed, 40 insertions(+), 1 deletion(-) diff --git a/src/config.py b/src/config.py index 5d29c52..e10f1d4 100644 --- a/src/config.py +++ b/src/config.py @@ -193,6 +193,13 @@ def make_sirene_data_dir(sirene_data_parent_dir) -> Path: SOLO_DATASET = os.getenv("SOLO_DATASET", "") print(f"{'SOLO_DATASET':<40}", SOLO_DATASET) +# Acheteurs absents de la base SIRENE (pour raisons de sécurité ou autre) +# Format: SIRET -> {"nom": "...", ...} +# Ces données sont utilisées en fallback si l'acheteur n'est pas trouvé dans SIRENE +ACHETEURS_NON_SIRENE = { + "13001536500013": {"nom": "Ministère des Armées"}, +} + with open( make_path_from_env( "DATASETS_REFERENCE_FILEPATH", REFERENCE_DIR / "source_datasets.json" diff --git a/src/tasks/clean.py b/src/tasks/clean.py index 54a567a..66334f0 100644 --- a/src/tasks/clean.py +++ b/src/tasks/clean.py @@ -125,6 +125,9 @@ def clean_decp(lf: pl.LazyFrame, decp_format: DecpFormat) -> pl.LazyFrame: .name.keep() ) + # Nettoyage des espaces dans titulaire_id (ex: " 33487372600239") + lf = lf.with_columns(pl.col("titulaire_id").str.strip_chars()) + # Type identifiant = SIRET si vide (marches-securises.fr) lf = lf.with_columns( pl.when( diff --git a/src/tasks/enrich.py b/src/tasks/enrich.py index 9fad829..78a7f92 100644 --- a/src/tasks/enrich.py +++ b/src/tasks/enrich.py @@ -3,7 +3,7 @@ from polars_ds import haversine from prefect import task -from src.config import SIRENE_DATA_DIR +from src.config import ACHETEURS_NON_SIRENE, SIRENE_DATA_DIR from src.tasks.transform import ( extract_unique_acheteurs_siret, extract_unique_titulaires_siret, @@ -136,6 +136,9 @@ def enrich_from_sirene(lf: pl.LazyFrame): lf = lf.join(lf_sirets_acheteurs, how="left", on="acheteur_id") + # Fallback pour les acheteurs absents de SIRENE (ex: Ministère des Armées) + lf = apply_acheteurs_non_sirene_fallback(lf) + # En joignant en utilisant à la fois le SIRET et le typeIdentifiant, on s'assure qu'on ne joint pas sur # des id de titulaires non-SIRET lf = lf.join( @@ -155,6 +158,32 @@ def enrich_from_sirene(lf: pl.LazyFrame): return lf +def apply_acheteurs_non_sirene_fallback(lf: pl.LazyFrame) -> pl.LazyFrame: + """Applique les données d'acheteurs non présents dans SIRENE en fallback. + + Les acheteurs absents de SIRENE ET du fallback conservent acheteur_nom = NULL. + """ + if not ACHETEURS_NON_SIRENE: + return lf + + # Créer un DataFrame de fallback à partir du dictionnaire + fallback_data = [ + {"acheteur_id": siret, "acheteur_nom_fallback": data["nom"]} + for siret, data in ACHETEURS_NON_SIRENE.items() + ] + lf_fallback = pl.LazyFrame(fallback_data) + + # Joindre avec les données de fallback + lf = lf.join(lf_fallback, on="acheteur_id", how="left") + + # Remplacer acheteur_nom NULL par la valeur de fallback (si disponible) + lf = lf.with_columns( + pl.coalesce("acheteur_nom", "acheteur_nom_fallback").alias("acheteur_nom") + ).drop("acheteur_nom_fallback") + + return lf + + def calculate_distance(lf: pl.LazyFrame) -> pl.LazyFrame: # Utilisation de polars_ds.haversine # https://polars-ds-extension.readthedocs.io/en/latest/num.html#polars_ds.exprs.num.haversine From c61d505219b257003e857a09b1c5100a0808e900 Mon Sep 17 00:00:00 2001 From: Colin Maudry Date: Fri, 19 Dec 2025 12:06:40 +0100 Subject: [PATCH 02/24] =?UTF-8?q?Timeout=20pour=20la=20t=C3=A2che=20proces?= =?UTF-8?q?s=5Fbatch=20=C3=A0=2030min?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/flows/decp_processing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/flows/decp_processing.py b/src/flows/decp_processing.py index b838bcc..0502f55 100644 --- a/src/flows/decp_processing.py +++ b/src/flows/decp_processing.py @@ -136,7 +136,7 @@ def decp_processing(enable_cache_removal: bool = True): print("☑️ Fin du flow principal decp_processing.") -@task(retries=2) +@task(retries=2, timeout_seconds=1800) def process_batch( available_parquet_files, batch_size, From 58cc1f9dba231260947cdf1ac4f4716b15fe73f5 Mon Sep 17 00:00:00 2001 From: Colin Maudry Date: Tue, 23 Dec 2025 19:38:20 +0100 Subject: [PATCH 03/24] Changelog 2.6.4 --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4c1873c..039f956 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,7 @@ #### 2.6.4 2025-12-19 - Tri et numérotation des modifications après la concaténation plutôt que par ressource, pour réduire le nombre de doublons ([#156](https://github.com/ColinMaudry/decp-processing/issues/156)) -- Utilisation du logger de prefect plûtot que `log_prints=True` +- Utilisation du logger de prefect plûtot que `log_prints=True` ([#94](https://github.com/ColinMaudry/decp-processing/issues/94)) #### 2.6.3 2025-12-16 From a6be682c0c15b0af636aadb0978649ae95a61ac7 Mon Sep 17 00:00:00 2001 From: Colin Maudry Date: Wed, 24 Dec 2025 08:47:20 +0100 Subject: [PATCH 04/24] Pas de publication si SOLO_DATASET --- src/flows/decp_processing.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/flows/decp_processing.py b/src/flows/decp_processing.py index acf3982..4f8198f 100644 --- a/src/flows/decp_processing.py +++ b/src/flows/decp_processing.py @@ -20,6 +20,7 @@ PREFECT_API_URL, RESOURCE_CACHE_DIR, SIRENE_DATA_DIR, + SOLO_DATASET, TRACKED_DATASETS, ) from src.flows.sirene_preprocess import sirene_preprocess @@ -79,7 +80,11 @@ def decp_processing(enable_cache_removal: bool = True): ) # Afin d'être sûr que je ne publie pas par erreur un jeu de données de test - decp_publish = DECP_PROCESSING_PUBLISH and len(resources_to_process) > 5000 + decp_publish = ( + DECP_PROCESSING_PUBLISH + and len(resources_to_process) > 5000 + and SOLO_DATASET in ["", None] + ) if decp_publish: create_table_artifact( From c60dfdd65a45d8d7d7acf959210a6a63d37f0661 Mon Sep 17 00:00:00 2001 From: Colin Maudry Date: Wed, 24 Dec 2025 08:47:41 +0100 Subject: [PATCH 05/24] Correction des args logging #94 --- src/tasks/scrap.py | 2 +- src/tasks/utils.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/tasks/scrap.py b/src/tasks/scrap.py index 4e3524a..b77c865 100644 --- a/src/tasks/scrap.py +++ b/src/tasks/scrap.py @@ -92,7 +92,7 @@ def parse_result_page(): return [] soup = BeautifulSoup(html_result_page, "html.parser") result_div = soup.find("div", attrs={"id": "liste_consultations"}) - logger.info("Year: ", year, "Month: ", month, "Page: ", str(page)) + logger.info(f"Year: {year}, Month: {month}, Page: {str(page)}") return result_div.find_all( "a", attrs={"title": "Télécharger au format Json"} ) diff --git a/src/tasks/utils.py b/src/tasks/utils.py index 6059602..b14fad1 100644 --- a/src/tasks/utils.py +++ b/src/tasks/utils.py @@ -80,7 +80,7 @@ def remove_unused_cache( logger.info(f"Suppression du fichier de cache: {file}") deleted_files.append(file) file.unlink() - logger.info(f"-> {len(deleted_files)} fichiers supprimés") + logger.info(f"-> {len(deleted_files)} fichiers de cache supprimés") # From 63827e36da2d938ba4e608c15a58936d90bd0064 Mon Sep 17 00:00:00 2001 From: Colin Maudry Date: Wed, 24 Dec 2025 09:24:18 +0100 Subject: [PATCH 06/24] Merci @imanuch pour PR #160 ! --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index acf4ed0..448161f 100644 --- a/README.md +++ b/README.md @@ -163,3 +163,4 @@ pytest tests/test_main.py - [FranckMaseo](https://github.com/frankmaseo) - [Thomas Louf](https://github.com/tlouf) - [vico4445](https://github.com/vico4445) +- [imanuch](https://github.com/imanuch) From 1bda5e372b93931e815163a029d37c068a40658d Mon Sep 17 00:00:00 2001 From: Colin Maudry Date: Wed, 24 Dec 2025 12:28:43 +0100 Subject: [PATCH 07/24] Rempacement des guillemets simples par des apostrophes dans objet dash.data_tables bug avec les guillemets simples --- src/tasks/clean.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/tasks/clean.py b/src/tasks/clean.py index fc1664b..2f8b941 100644 --- a/src/tasks/clean.py +++ b/src/tasks/clean.py @@ -143,6 +143,9 @@ def clean_decp(lf: pl.LazyFrame, decp_format: DecpFormat) -> pl.LazyFrame: # NC lf = lf.with_columns(pl.col(pl.Utf8).replace("NC", None)) + # Remplacement des single quotes qui servent d'apostrophes + lf = lf.with_columns(pl.col("objet").str.replace_all(r"(\w)'(\w)", "$1’$2")) + # Correction des datatypes lf = fix_data_types(lf) From b065fdd7f32c693bed6556832d54675bf97a0c54 Mon Sep 17 00:00:00 2001 From: Colin Maudry Date: Sat, 27 Dec 2025 07:20:46 +0100 Subject: [PATCH 08/24] =?UTF-8?q?Gestion=20des=20variables=20d'entr=C3=A9e?= =?UTF-8?q?=20du=20scrap,=20r=C3=A9duction=20des=20temps=20de=20latence=20?= =?UTF-8?q?(sleep)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- run_flow.py | 3 ++- src/flows/scrap.py | 16 ++++++++++------ src/tasks/scrap.py | 4 ++-- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/run_flow.py b/run_flow.py index 2cec406..04ee321 100644 --- a/run_flow.py +++ b/run_flow.py @@ -1,5 +1,6 @@ import sys +from src.config import SCRAPING_MODE, SCRAPING_TARGET from src.flows.decp_processing import decp_processing from src.flows.get_cog import get_cog from src.flows.scrap import scrap @@ -28,4 +29,4 @@ if func_name != "scrap": FUNCTIONS[func_name]() else: - scrap(target="aws", mode="all") + scrap(mode=SCRAPING_MODE, target=SCRAPING_TARGET) diff --git a/src/flows/scrap.py b/src/flows/scrap.py index dcd0cd3..02641e4 100644 --- a/src/flows/scrap.py +++ b/src/flows/scrap.py @@ -16,7 +16,7 @@ @flow(log_prints=True) -def scrap(target: str = None, mode: str = None, month=None, year=None): +def scrap(target: str, mode: str, month=None, year=None): logger = get_logger(level=LOG_LEVEL) # Remise à zéro du dossier dist dist_dir: Path = DIST_DIR / target @@ -26,9 +26,6 @@ def scrap(target: str = None, mode: str = None, month=None, year=None): else: dist_dir.mkdir(parents=True) - # Sélection du target - target = target or SCRAPING_TARGET - # Sélection de la fonction de scraping en fonction de target if target == "aws": scrap_target_month = scrap_aws_month @@ -43,8 +40,15 @@ def scrap(target: str = None, mode: str = None, month=None, year=None): month = month or current_month year = year or current_year - # Sélection du mode - mode = mode or SCRAPING_MODE + # Récapitulatif de la config + # mode et target doivent être passés en paramètre + # les éventuelles env sont injectées via /run_flow.py + # en prod les paramètres sont spécifiées dans le deployment Prefect + logger.info(f""" + Target: {target} (env {SCRAPING_TARGET}) + Mode: {mode} (env {SCRAPING_MODE}) + Year: {year}) + Month: {month})""") # Sélection de la plage temporelle if mode == "month": diff --git a/src/tasks/scrap.py b/src/tasks/scrap.py index b77c865..154aec3 100644 --- a/src/tasks/scrap.py +++ b/src/tasks/scrap.py @@ -100,7 +100,7 @@ def parse_result_page(): try: json_links = parse_result_page() except AttributeError: - sleep(3) + sleep(1) logger.info("Retrying result page download and parsing...") json_links = parse_result_page() @@ -206,7 +206,7 @@ def search_form(end_date_: date) -> tuple[date, str, int]: continue elif result_code == "timeout": # On réessaie après 10 secondes - sleep(10) + sleep(3) retry_count += 1 continue elif result_code is None: From 11f03fb4734ed1f00da9a17bd95b7995a1940792 Mon Sep 17 00:00:00 2001 From: Colin Maudry Date: Sat, 27 Dec 2025 16:16:51 +0100 Subject: [PATCH 09/24] Typo --- reference/schema_base.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/reference/schema_base.json b/reference/schema_base.json index 5532ede..1a6b9e7 100644 --- a/reference/schema_base.json +++ b/reference/schema_base.json @@ -81,7 +81,7 @@ "type": "string", "name": "procedure", "title": "Procédure", - "description": "Le type de procédure utilisé pour le marché public. Prodcédure négociée ouverte, Procédure non négociée ouverte, Procédure négociée restreinte, Procédure non négociée restreinte.", + "description": "Le type de procédure utilisé pour le marché public. Procédure négociée ouverte, Procédure non négociée ouverte, Procédure négociée restreinte, Procédure non négociée restreinte.", "short_title": null }, { From d721e9ba086a264a248cb449463ec8c1bf64da96 Mon Sep 17 00:00:00 2001 From: Colin Maudry Date: Sat, 27 Dec 2025 16:17:40 +0100 Subject: [PATCH 10/24] =?UTF-8?q?Plus=20de=20contr=C3=B4le=20sur=20le=20sc?= =?UTF-8?q?rap=20via=20env?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- run_flow.py | 9 +++++++-- src/config.py | 8 ++++++++ 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/run_flow.py b/run_flow.py index 04ee321..619c086 100644 --- a/run_flow.py +++ b/run_flow.py @@ -1,6 +1,6 @@ import sys -from src.config import SCRAPING_MODE, SCRAPING_TARGET +from src.config import SCRAPING_MODE, SCRAPING_MONTH, SCRAPING_TARGET, SCRAPING_YEAR from src.flows.decp_processing import decp_processing from src.flows.get_cog import get_cog from src.flows.scrap import scrap @@ -29,4 +29,9 @@ if func_name != "scrap": FUNCTIONS[func_name]() else: - scrap(mode=SCRAPING_MODE, target=SCRAPING_TARGET) + scrap( + mode=SCRAPING_MODE, + target=SCRAPING_TARGET, + month=SCRAPING_MONTH, + year=SCRAPING_YEAR, + ) diff --git a/src/config.py b/src/config.py index feb0d24..5530afb 100644 --- a/src/config.py +++ b/src/config.py @@ -132,6 +132,14 @@ def make_sirene_data_dir(sirene_data_parent_dir) -> Path: SCRAPING_TARGET = os.getenv("SCRAPING_TARGET") ALL_CONFIG["SCRAPING_TARGET"] = SCRAPING_TARGET +# Year (année cible pour le scraping) +SCRAPING_YEAR = os.getenv("SCRAPING_YEAR") +ALL_CONFIG["SCRAPING_YEAR"] = SCRAPING_YEAR + +# Month (mois cible pour le scraping) +SCRAPING_MONTH = os.getenv("SCRAPING_MONTH") +ALL_CONFIG["SCRAPING_MONTH"] = SCRAPING_MONTH + # Lecture ou non des ressource en cache DECP_USE_CACHE = os.getenv("DECP_USE_CACHE", "false").lower() == "true" From 806bf1bc505cace73489b54214e78ad6e736c372 Mon Sep 17 00:00:00 2001 From: Colin Maudry Date: Sat, 27 Dec 2025 16:18:03 +0100 Subject: [PATCH 11/24] =?UTF-8?q?Scrap=20des=20donn=C3=A9es=20AIFE=20#144?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/flows/scrap.py | 12 ++- src/tasks/publish.py | 1 + src/tasks/scrap.py | 176 ++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 184 insertions(+), 5 deletions(-) diff --git a/src/flows/scrap.py b/src/flows/scrap.py index 02641e4..664ff93 100644 --- a/src/flows/scrap.py +++ b/src/flows/scrap.py @@ -11,7 +11,11 @@ SCRAPING_MODE, SCRAPING_TARGET, ) -from src.tasks.scrap import scrap_aws_month, scrap_marches_securises_month +from src.tasks.scrap import ( + scrap_aws_month, + scrap_dume_month, + scrap_marches_securises_month, +) from src.tasks.utils import get_logger @@ -23,14 +27,16 @@ def scrap(target: str, mode: str, month=None, year=None): if dist_dir.exists(): logger.debug(f"Suppression de {dist_dir}...") rmtree(dist_dir) - else: - dist_dir.mkdir(parents=True) + + dist_dir.mkdir(parents=True) # Sélection de la fonction de scraping en fonction de target if target == "aws": scrap_target_month = scrap_aws_month elif target == "marches-securises.fr": scrap_target_month = scrap_marches_securises_month + elif target == "dume": + scrap_target_month = scrap_dume_month else: logger.error("Quel target ?") raise ValueError diff --git a/src/tasks/publish.py b/src/tasks/publish.py index 08171f4..c6bfbf2 100644 --- a/src/tasks/publish.py +++ b/src/tasks/publish.py @@ -106,6 +106,7 @@ def publish_scrap_to_datagouv(year: str, month: str, file_path, target): dataset_ids = { "aws": "68caf6b135f19236a4f37a32", "marches-securises.fr": "68ebb48dd708fdb2d7c15bff", + "dume": "694ff7a98210456475f98aca", } logger = get_logger(level=LOG_LEVEL) diff --git a/src/tasks/scrap.py b/src/tasks/scrap.py index 154aec3..ceaed0b 100644 --- a/src/tasks/scrap.py +++ b/src/tasks/scrap.py @@ -6,7 +6,9 @@ from pathlib import Path from time import sleep +import dume_api import httpx +import polars as pl from bs4 import BeautifulSoup from prefect import task from selenium import webdriver @@ -15,7 +17,7 @@ from selenium.webdriver.firefox.options import Options from selenium.webdriver.support.wait import WebDriverWait -from src.config import LOG_LEVEL +from src.config import DIST_DIR, LOG_LEVEL from src.tasks.publish import publish_scrap_to_datagouv from src.tasks.utils import get_logger @@ -120,7 +122,7 @@ def parse_result_page(): publish_scrap_to_datagouv(year, month, json_path, "marches-securises.fr") -@task(log_prints=True) +@task() def scrap_aws_month(year: str = None, month: str = None, dist_dir: Path = None): logger = get_logger(level=LOG_LEVEL) @@ -298,6 +300,176 @@ def replacer(match): # End scrap AWS +def scrap_dume_month(year: str = None, month: str = None, dist_dir: Path = None): + logger = get_logger(level=LOG_LEVEL) + + end_date = start_date = date(int(year), int(month), 1) + base_duration = timedelta(days=0) + nb_days_in_month = calendar.monthrange(start_date.year, start_date.month)[1] + last_month_day = start_date + timedelta(days=nb_days_in_month - 1) + marches_month = [] + decp_uids = set( + pl.read_parquet(DIST_DIR / "decp.parquet", columns=["uid"])["uid"].to_list() + ) + + retry_count = 0 + + while end_date < last_month_day: + # On évite de boucler sans fin + if retry_count > 3: + logger.error("Trop d'essais, start_date + 1") + retry_count = 0 + start_date = end_date + timedelta(days=1) + continue + + start_date_str = start_date.isoformat() + + if retry_count == 0: + end_date = start_date + base_duration + + if end_date > last_month_day: + end_date = last_month_day + + end_date_str = end_date.isoformat() + + logger.info(f"➡️ {start_date_str} -> {end_date_str}") + rows, nb_results = get_dume_rows(start_date_str, end_date_str) + + marches = [] + if nb_results > 0: + marches = dume_to_decp(rows) + + marches_month.extend(marches) + + # On passe aux jours suivants + start_date = end_date + timedelta(days=1) + retry_count = 0 + continue + + if len(marches_month) > 0: + # Format 2022, donc double niveau + dicts = {"marches": {"marche": marches_month}} + json_path = dist_dir / f"dume_{year}-{month}.json" + with open(json_path, "w") as f: + f.write(json.dumps(dicts, indent=2)) + logger.info(str(len(marches_month)) + f" marchés pour le mois ({month}/{year})") + + get_uid_stats(marches_month, decp_uids) + publish_scrap_to_datagouv(year, month, json_path, "dume") + + +def dume_to_decp(rows): + new_rows = [] + for r in rows: + d = r.get("donneesMP") + new_row = { + "uid": d.get("idAcheteur") + r.get("id"), + "id": r.get("id"), + "objet": r.get("objet"), + "nature": r.get("nature"), + "procedure": r.get("procedure"), + "dureeMois": r.get("dureeMois"), + "datePublicationDonnees": r.get("datePublicationDonnees"), + "acheteur": { + "id": d.get("idAcheteur"), + }, + "techniques": [ + {"technique": [d.get("technique")]}, + ], + "modalitesExecution": [ + {"modaliteExecution": [d.get("modaliteExecution")]}, + ], + "idAccordCadre": d.get("idAccordCadre"), + "codeCPV": d.get("codeCPV"), + "lieuExecution": { + "code": d.get("lieuExecutionCode"), + "typeCode": d.get("lieuExecutionTypeCode"), + }, + "dateNotification": d.get("dateNotification"), + "marchesInnovant": d.get("marchesInnovant"), + "attributionAvance": d.get("attributionAvance"), + "tauxAvance": d.get("tauxAvance"), + "origineUE": d.get("origineUE"), + "origineFrance": d.get("origineFrance"), + "ccag": d.get("ccag"), + "offresRecues": d.get("offresRecues"), + "montant": d.get("montant"), + "formePrix": d.get("formePrix"), + "typesPrix": {"typePrix": [d.get("typePrix")]}, + "typeGroupementOperateurs": d.get("typeGroupementOperateurs"), + "sousTraitanceDeclaree": d.get("sousTraitanceDeclaree"), + "titulaires": [ + {"titulaire": titulaire} for titulaire in d.get("titulaires") + ], + "modifications": r.get("modifications"), + } + new_rows.append(new_row) + + return new_rows + + +def get_dume_rows(start_date_str: str, end_date_str: str) -> tuple[list[dict], int]: + procedures = [ + "Marché passé sans publicité ni mise en concurrence préalable", + "Appel d'offres ouvert", + "Appel d'offres restreint", + "Procédure adaptée", + "Procédure avec négociation", + "Dialogue compétitif", + ] + + logger = get_logger(level=LOG_LEVEL) + + _rows = [] + + for procedure in procedures: + try: + new_rows = dume_api.get_contracts( + date_start=start_date_str, + date_end=end_date_str, + type_de="MP", + procedure=procedure, + partition_map={ + "nature": [ + "Marché", + "Marché de partenariat", + "Accord-cadre", + "Marché subséquent", + ] + }, + ) + sleep(0.1) + _rows.extend(new_rows) + logger.debug(procedure + " " + str(len(new_rows)) + f" rows ({len(_rows)})") + except RuntimeError: + logger.error(f"Trop de lignes, on passe ({procedure})") + + _nb_results = len(_rows) + logger.debug(str(_nb_results) + " rows pour la période") + + return _rows, _nb_results + + +def get_uid_stats(marches, decp_uids): + logger = get_logger(level=LOG_LEVEL) + + dume_uids = {marche["uid"] for marche in marches} + + in_decp_uids = dume_uids.intersection(decp_uids) + + logger.info( + str(len(dume_uids)) + " identifiants uniques dans le DUME pour cette période" + ) + logger.info( + str(len(in_decp_uids)) + + " identifiants uniques dans le DUME pour cette période présents dans les DECP consolidées" + ) + logger.info( + str(round(len(in_decp_uids) / len(dume_uids) * 100, 2)) + + " % des identifiants sur cette période sont présents dans les DECP consolidées tabulaires" + ) + + def wait_for_either_element(driver, timeout=10) -> tuple[str or None, int]: """ Attend de voir si le bouton de téléchargement apparaît ou bien le message d'erreur. From fe7ff68d4e56bd50752579080313659ce220c121 Mon Sep 17 00:00:00 2001 From: Colin Maudry Date: Sat, 27 Dec 2025 16:25:01 +0100 Subject: [PATCH 12/24] Source et deployment #144 --- pyproject.toml | 3 ++- src/deployments.py | 22 ++++++++++++++++++++++ src/tasks/scrap.py | 1 + 3 files changed, 25 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 90135f9..60adaad 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,7 +23,8 @@ dependencies = [ "selenium", "polars_ds", "scikit-learn", - "tenacity" + "tenacity", + "dume_api" ] [project.optional-dependencies] diff --git a/src/deployments.py b/src/deployments.py index 5fd3f81..d6b0b2f 100644 --- a/src/deployments.py +++ b/src/deployments.py @@ -130,6 +130,28 @@ }, ) + flow.from_source( + source=GitRepository( + url="https://github.com/ColinMaudry/decp-processing.git", branch="main" + ), + entrypoint="src/flows/scrap.py:scrap", + ).deploy( + name="scrap-dume", + description="Scraping des données de l'API DUME.", + ignore_warnings=True, + work_pool_name="local", + cron="0 0 * * 2", + job_variables={ + "env": { + "DECP_PROCESSING_PUBLISH": "True", + "DECP_DIST_DIR": "/srv/shared/decp/prod/dist", + "PREFECT_TASKS_REFRESH_CACHE": "False", + "SCRAPING_MODE": "month", + "SCRAPING_TARGET": "dume", + } + }, + ) + flow.from_source( source=GitRepository( url="https://github.com/ColinMaudry/decp-processing.git", branch="main" diff --git a/src/tasks/scrap.py b/src/tasks/scrap.py index ceaed0b..fd44a7e 100644 --- a/src/tasks/scrap.py +++ b/src/tasks/scrap.py @@ -402,6 +402,7 @@ def dume_to_decp(rows): {"titulaire": titulaire} for titulaire in d.get("titulaires") ], "modifications": r.get("modifications"), + "source": "scrap_aife_dume", } new_rows.append(new_row) From 69b1c30c6798d2d69c4cbb7409300c2d94b90c6b Mon Sep 17 00:00:00 2001 From: Colin Maudry Date: Sat, 27 Dec 2025 21:16:37 +0100 Subject: [PATCH 13/24] Meilleure gestion titulaires DUME #144 --- src/flows/scrap.py | 6 +++--- src/tasks/scrap.py | 10 +++++++--- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/src/flows/scrap.py b/src/flows/scrap.py index 664ff93..8732041 100644 --- a/src/flows/scrap.py +++ b/src/flows/scrap.py @@ -53,8 +53,8 @@ def scrap(target: str, mode: str, month=None, year=None): logger.info(f""" Target: {target} (env {SCRAPING_TARGET}) Mode: {mode} (env {SCRAPING_MODE}) - Year: {year}) - Month: {month})""") + Year: {year} + Month: {month}""") # Sélection de la plage temporelle if mode == "month": @@ -67,7 +67,7 @@ def scrap(target: str, mode: str, month=None, year=None): elif mode == "all": current_year = int(current_year) - for year in reversed(range(2018, current_year + 2)): + for year in reversed(range(2018, current_year + 1)): scrap(target=target, mode="year", year=str(year)) else: diff --git a/src/tasks/scrap.py b/src/tasks/scrap.py index fd44a7e..5806e22 100644 --- a/src/tasks/scrap.py +++ b/src/tasks/scrap.py @@ -360,6 +360,12 @@ def scrap_dume_month(year: str = None, month: str = None, dist_dir: Path = None) def dume_to_decp(rows): new_rows = [] + + def get_titulaires(titulaires): + if isinstance(titulaires, list) and len(titulaires) > 0: + return [{"titulaire": titulaire} for titulaire in d.get("titulaires")] + return [] + for r in rows: d = r.get("donneesMP") new_row = { @@ -398,9 +404,7 @@ def dume_to_decp(rows): "typesPrix": {"typePrix": [d.get("typePrix")]}, "typeGroupementOperateurs": d.get("typeGroupementOperateurs"), "sousTraitanceDeclaree": d.get("sousTraitanceDeclaree"), - "titulaires": [ - {"titulaire": titulaire} for titulaire in d.get("titulaires") - ], + "titulaires": get_titulaires(d.get("titulaires")), "modifications": r.get("modifications"), "source": "scrap_aife_dume", } From c2df72c07a7ad1b53c435d94733527e274f77ca0 Mon Sep 17 00:00:00 2001 From: Colin Maudry Date: Sat, 27 Dec 2025 21:16:51 +0100 Subject: [PATCH 14/24] Ajout du dataset scrap dume #144 --- reference/source_datasets.json | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/reference/source_datasets.json b/reference/source_datasets.json index 8b77f8d..4dfd212 100644 --- a/reference/source_datasets.json +++ b/reference/source_datasets.json @@ -257,6 +257,12 @@ "owner_org_name": "Agence pour l'Informatique Financière de l'Etat", "code": "aife_dume" }, + { + "id": "694ff7a98210456475f98aca", + "name": "Données essentielles de la commande publique (DECP) de l'API DUME (AIFE)", + "owner_org_name": "Colin Maudry", + "code": "scrap_aife_dume" + }, { "id": "68ebb48dd708fdb2d7c15bff", "name": "Données des marchés publics de marches-securises.fr", From d5e7d7207ebe5a16b20ac7b0b985876da520fee3 Mon Sep 17 00:00:00 2001 From: Colin Maudry Date: Mon, 29 Dec 2025 08:26:47 +0100 Subject: [PATCH 15/24] Changelog --- CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 039f956..dc40285 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +### 2.7.0 + +- Remplacement des guillemets simples par des apostrophes dans "objet" +- Ajout des données de l'API DUME (code source `scrap_aife_dume`) + #### 2.6.4 2025-12-19 - Tri et numérotation des modifications après la concaténation plutôt que par ressource, pour réduire le nombre de doublons ([#156](https://github.com/ColinMaudry/decp-processing/issues/156)) From e4c1f83a0ab1cdb5a0567b8be253441b6daddeea Mon Sep 17 00:00:00 2001 From: Colin Maudry Date: Mon, 29 Dec 2025 08:31:32 +0100 Subject: [PATCH 16/24] =?UTF-8?q?Refacto=20fonctions=20scrap=20dans=20des?= =?UTF-8?q?=20fichiers=20d=C3=A9di=C3=A9s?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/flows/scrap.py | 8 +- src/tasks/scrap.py | 534 --------------------------- src/tasks/scrap/__init__.py | 0 src/tasks/scrap/aws.py | 256 +++++++++++++ src/tasks/scrap/dume.py | 187 ++++++++++ src/tasks/scrap/marches_securises.py | 106 ++++++ 6 files changed, 552 insertions(+), 539 deletions(-) delete mode 100644 src/tasks/scrap.py create mode 100644 src/tasks/scrap/__init__.py create mode 100644 src/tasks/scrap/aws.py create mode 100644 src/tasks/scrap/dume.py create mode 100644 src/tasks/scrap/marches_securises.py diff --git a/src/flows/scrap.py b/src/flows/scrap.py index 8732041..487348e 100644 --- a/src/flows/scrap.py +++ b/src/flows/scrap.py @@ -11,12 +11,10 @@ SCRAPING_MODE, SCRAPING_TARGET, ) -from src.tasks.scrap import ( - scrap_aws_month, - scrap_dume_month, - scrap_marches_securises_month, -) from src.tasks.utils import get_logger +from tasks.scrap.aws import scrap_aws_month +from tasks.scrap.dume import scrap_dume_month +from tasks.scrap.marches_securises import scrap_marches_securises_month @flow(log_prints=True) diff --git a/src/tasks/scrap.py b/src/tasks/scrap.py deleted file mode 100644 index 5806e22..0000000 --- a/src/tasks/scrap.py +++ /dev/null @@ -1,534 +0,0 @@ -import calendar -import json -import re -import time -from datetime import date, timedelta -from pathlib import Path -from time import sleep - -import dume_api -import httpx -import polars as pl -from bs4 import BeautifulSoup -from prefect import task -from selenium import webdriver -from selenium.common import TimeoutException -from selenium.webdriver.common.by import By -from selenium.webdriver.firefox.options import Options -from selenium.webdriver.support.wait import WebDriverWait - -from src.config import DIST_DIR, LOG_LEVEL -from src.tasks.publish import publish_scrap_to_datagouv -from src.tasks.utils import get_logger - - -def get_html(url: str, client: httpx.Client) -> str or None: - headers = { - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:128.0) Gecko/20100101 Firefox/128.0", - "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/png,image/svg+xml,*/*;q=0.8", - "Connection": "keep-alive", - } - logger = get_logger(level=LOG_LEVEL) - - def get_response() -> httpx.Response: - return client.get(url, timeout=timeout, headers=headers).raise_for_status() - - timeout = httpx.Timeout(20.0, connect=60.0, pool=20.0, read=20.0) - try: - response = get_response() - except (httpx.ConnectError, httpx.ReadTimeout, httpx.HTTPStatusError): - logger.debug("3s break and retrying...") - sleep(3) - try: - response = get_response() - except (httpx.ConnectError, httpx.ReadTimeout, httpx.HTTPStatusError): - logger.error("Skipped") - return None - html = response.text - return html - - -# @task( -# cache_policy=INPUTS, -# persist_result=True, -# cache_expiration=datetime.timedelta(days=15), -# ) -def get_json_marches_securises(url: str, client: httpx.Client) -> dict or None: - json_html_page = get_html(url, client) - logger = get_logger(level=LOG_LEVEL) - - sleep(0.1) - if json_html_page: - json_html_page = ( - json_html_page.replace("", "") + "" - ) - else: - logger.warning("json_html_page is None, skipping...") - return None - json_html_page_soup = BeautifulSoup(json_html_page, "html.parser") - try: - decp_json = json.loads(json_html_page_soup.find("body").string) - except Exception as e: - logger.info(json_html_page) - logger.info(e) - return None - return decp_json - - -@task(log_prints=True) -def scrap_marches_securises_month(year: str, month: str, dist_dir: Path): - logger = get_logger(level=LOG_LEVEL) - - marches = [] - page = 1 - with httpx.Client() as client: - while True: - search_url = ( - f"https://www.marches-securises.fr/entreprise/?module=liste_donnees_essentielles&page={str(page)}&siret_pa=&siret_pa1=&date_deb={year}-{month}-01&date_fin={year}-{month}-31&date_deb_ms={year}-{month}-01&date_fin_ms={year}-{month}-31&ref_ume=&cpv_et=&type_procedure=&type_marche=&objet=&rs_oe=&dep_liste=&ctrl_key=aWwwS1pLUlFzejBOYitCWEZzZTEzZz09&text=&donnees_essentielles=1&search=" - f"table_ms&" - ) - - def parse_result_page(): - html_result_page = get_html(search_url, client) - if html_result_page is None: - return [] - soup = BeautifulSoup(html_result_page, "html.parser") - result_div = soup.find("div", attrs={"id": "liste_consultations"}) - logger.info(f"Year: {year}, Month: {month}, Page: {str(page)}") - return result_div.find_all( - "a", attrs={"title": "Télécharger au format Json"} - ) - - try: - json_links = parse_result_page() - except AttributeError: - sleep(1) - logger.info("Retrying result page download and parsing...") - json_links = parse_result_page() - - if not json_links: - break - else: - page += 1 - for json_link in json_links: - json_href = "https://www.marches-securises.fr" + json_link["href"] - decp_json = get_json_marches_securises(json_href, client) - marches.append(decp_json) - if len(marches) > 0: - dicts = {"marches": marches} - json_path = dist_dir / f"marches-securises_{year}-{month}.json" - with open(json_path, "w") as f: - f.write(json.dumps(dicts)) - publish_scrap_to_datagouv(year, month, json_path, "marches-securises.fr") - - -@task() -def scrap_aws_month(year: str = None, month: str = None, dist_dir: Path = None): - logger = get_logger(level=LOG_LEVEL) - - options = Options() - options.add_argument("--headless") - options.set_preference("browser.download.folderList", 2) - options.set_preference("browser.download.manager.showWhenStarting", False) - options.set_preference("browser.download.dir", str(dist_dir)) - - driver = webdriver.Firefox(options=options) - driver.implicitly_wait(10) # secondes - - end_date = start_date = date(int(year), int(month), 1) - base_duration = timedelta(days=3) - nb_days_in_month = calendar.monthrange(start_date.year, start_date.month)[1] - last_month_day = start_date + timedelta(days=nb_days_in_month - 1) - marches_month = [] - replacements = httpx.get( - "https://www.data.gouv.fr/api/1/datasets/r/3bdd5a64-c28e-4c6a-84fd-5a28bcaa53e9", - follow_redirects=True, - ).json() - - retry_count = 0 - - while end_date < last_month_day: - # On évite de boucler sans fin - if retry_count > 3: - retry_count = 0 - start_date = end_date + timedelta(days=1) - continue - - start_date_str = start_date.isoformat() - end_date = start_date + base_duration - - if end_date > last_month_day: - end_date = last_month_day - - driver.get("https://www.marches-publics.info/Annonces/rechercher") - - def search_form(end_date_: date) -> tuple[date, str, int]: - end_date_str_ = end_date_.isoformat() - sleep(1) - logger.info(f"➡️ {start_date_str} -> {end_date_str_}") - - # Formulaire recherche données essentielles - form = driver.find_element(By.ID, "formRech") - de_radio = form.find_element(By.ID, "typeDE") - de_radio.click() - - # Remplir le formulaire - notif_debut = form.find_element(By.ID, "dateNotifDebut") - notif_debut.clear() - notif_debut.send_keys(start_date_str) - notif_fin = form.find_element(By.ID, "dateNotifFin") - notif_fin.clear() - notif_fin.send_keys(end_date_str_) - sleep(0.1) - form.find_element(By.ID, "sub").click() - sleep(1) - - # Soit le bouton de téléchargement apparaît, soit il y a une erreur parce - # que de trop nombreux résultats sont retournés - result_code_, nb_results_ = wait_for_either_element(driver) - - # End search_form() - return end_date, result_code_, nb_results_ - - end_date, result_code, nb_results = search_form(end_date) - - if result_code == "too_many": - # On réessaie avec moins de résultats - if end_date != start_date: - logger.info("💥 Trop de résultats, on réessaie avec un jour de moins") - end_date = search_form(end_date - timedelta(days=1)) - continue - else: - logger.info("start_date == end_date et trop de résultats, on skip !") - start_date = end_date + timedelta(days=1) - continue - elif result_code == "no_result": - logger.info("👻 Aucun résultat, on skip.") - start_date = end_date + timedelta(days=1) - continue - elif result_code == "timeout": - # On réessaie après 10 secondes - sleep(3) - retry_count += 1 - continue - elif result_code is None: - logger.info("❓ Pas de téléchargement, on skip.") - start_date = end_date + timedelta(days=1) - continue - - end_date_str = end_date.isoformat() - - json_path = dist_dir / "donneesEssentielles.json" - - start_time = time.time() - last_size = 0 - timeout = 10 - downloaded = False - final_json_path = dist_dir / f"{start_date_str}_{end_date_str}.json" - - while time.time() - start_time < timeout and downloaded is False: - if json_path.exists(): - current_size = json_path.stat().st_size - if current_size == last_size and current_size > 0: - sleep(0.1) - json_path.rename(final_json_path) - downloaded = True - last_size = current_size - time.sleep(0.2) - - if final_json_path.exists(): - with open(final_json_path, "r") as f: - json_text = f.read() - try: - marches = json.loads(json_text)["marches"] - except json.decoder.JSONDecodeError: - logger.debug("Le décodage JSON a échoué, tentative de correction...") - - def fix_unescaped_quotes_in_objet(text): - """Générée avec l'aide de ChatGPT (GPT-4o)""" - - # Match the value of "objet" up to the "montant" key - def replacer(match): - # Escape quotes that are not already escaped - fixed_objet = re.sub(r'(? 0 and isinstance(marches_month, list): - # Format 2022, donc double niveau - dicts = {"marches": {"marche": marches_month}} - json_path = dist_dir / f"aws_{year}-{month}.json" - with open(json_path, "w") as f: - f.write(json.dumps(dicts)) - publish_scrap_to_datagouv(year, month, json_path, "aws") - - # End scrap AWS - - -def scrap_dume_month(year: str = None, month: str = None, dist_dir: Path = None): - logger = get_logger(level=LOG_LEVEL) - - end_date = start_date = date(int(year), int(month), 1) - base_duration = timedelta(days=0) - nb_days_in_month = calendar.monthrange(start_date.year, start_date.month)[1] - last_month_day = start_date + timedelta(days=nb_days_in_month - 1) - marches_month = [] - decp_uids = set( - pl.read_parquet(DIST_DIR / "decp.parquet", columns=["uid"])["uid"].to_list() - ) - - retry_count = 0 - - while end_date < last_month_day: - # On évite de boucler sans fin - if retry_count > 3: - logger.error("Trop d'essais, start_date + 1") - retry_count = 0 - start_date = end_date + timedelta(days=1) - continue - - start_date_str = start_date.isoformat() - - if retry_count == 0: - end_date = start_date + base_duration - - if end_date > last_month_day: - end_date = last_month_day - - end_date_str = end_date.isoformat() - - logger.info(f"➡️ {start_date_str} -> {end_date_str}") - rows, nb_results = get_dume_rows(start_date_str, end_date_str) - - marches = [] - if nb_results > 0: - marches = dume_to_decp(rows) - - marches_month.extend(marches) - - # On passe aux jours suivants - start_date = end_date + timedelta(days=1) - retry_count = 0 - continue - - if len(marches_month) > 0: - # Format 2022, donc double niveau - dicts = {"marches": {"marche": marches_month}} - json_path = dist_dir / f"dume_{year}-{month}.json" - with open(json_path, "w") as f: - f.write(json.dumps(dicts, indent=2)) - logger.info(str(len(marches_month)) + f" marchés pour le mois ({month}/{year})") - - get_uid_stats(marches_month, decp_uids) - publish_scrap_to_datagouv(year, month, json_path, "dume") - - -def dume_to_decp(rows): - new_rows = [] - - def get_titulaires(titulaires): - if isinstance(titulaires, list) and len(titulaires) > 0: - return [{"titulaire": titulaire} for titulaire in d.get("titulaires")] - return [] - - for r in rows: - d = r.get("donneesMP") - new_row = { - "uid": d.get("idAcheteur") + r.get("id"), - "id": r.get("id"), - "objet": r.get("objet"), - "nature": r.get("nature"), - "procedure": r.get("procedure"), - "dureeMois": r.get("dureeMois"), - "datePublicationDonnees": r.get("datePublicationDonnees"), - "acheteur": { - "id": d.get("idAcheteur"), - }, - "techniques": [ - {"technique": [d.get("technique")]}, - ], - "modalitesExecution": [ - {"modaliteExecution": [d.get("modaliteExecution")]}, - ], - "idAccordCadre": d.get("idAccordCadre"), - "codeCPV": d.get("codeCPV"), - "lieuExecution": { - "code": d.get("lieuExecutionCode"), - "typeCode": d.get("lieuExecutionTypeCode"), - }, - "dateNotification": d.get("dateNotification"), - "marchesInnovant": d.get("marchesInnovant"), - "attributionAvance": d.get("attributionAvance"), - "tauxAvance": d.get("tauxAvance"), - "origineUE": d.get("origineUE"), - "origineFrance": d.get("origineFrance"), - "ccag": d.get("ccag"), - "offresRecues": d.get("offresRecues"), - "montant": d.get("montant"), - "formePrix": d.get("formePrix"), - "typesPrix": {"typePrix": [d.get("typePrix")]}, - "typeGroupementOperateurs": d.get("typeGroupementOperateurs"), - "sousTraitanceDeclaree": d.get("sousTraitanceDeclaree"), - "titulaires": get_titulaires(d.get("titulaires")), - "modifications": r.get("modifications"), - "source": "scrap_aife_dume", - } - new_rows.append(new_row) - - return new_rows - - -def get_dume_rows(start_date_str: str, end_date_str: str) -> tuple[list[dict], int]: - procedures = [ - "Marché passé sans publicité ni mise en concurrence préalable", - "Appel d'offres ouvert", - "Appel d'offres restreint", - "Procédure adaptée", - "Procédure avec négociation", - "Dialogue compétitif", - ] - - logger = get_logger(level=LOG_LEVEL) - - _rows = [] - - for procedure in procedures: - try: - new_rows = dume_api.get_contracts( - date_start=start_date_str, - date_end=end_date_str, - type_de="MP", - procedure=procedure, - partition_map={ - "nature": [ - "Marché", - "Marché de partenariat", - "Accord-cadre", - "Marché subséquent", - ] - }, - ) - sleep(0.1) - _rows.extend(new_rows) - logger.debug(procedure + " " + str(len(new_rows)) + f" rows ({len(_rows)})") - except RuntimeError: - logger.error(f"Trop de lignes, on passe ({procedure})") - - _nb_results = len(_rows) - logger.debug(str(_nb_results) + " rows pour la période") - - return _rows, _nb_results - - -def get_uid_stats(marches, decp_uids): - logger = get_logger(level=LOG_LEVEL) - - dume_uids = {marche["uid"] for marche in marches} - - in_decp_uids = dume_uids.intersection(decp_uids) - - logger.info( - str(len(dume_uids)) + " identifiants uniques dans le DUME pour cette période" - ) - logger.info( - str(len(in_decp_uids)) - + " identifiants uniques dans le DUME pour cette période présents dans les DECP consolidées" - ) - logger.info( - str(round(len(in_decp_uids) / len(dume_uids) * 100, 2)) - + " % des identifiants sur cette période sont présents dans les DECP consolidées tabulaires" - ) - - -def wait_for_either_element(driver, timeout=10) -> tuple[str or None, int]: - """ - Attend de voir si le bouton de téléchargement apparaît ou bien le message d'erreur. - Fonction générée en grande partie avec la LLM Euria, développée par Infomaniak - """ - logger = get_logger(level=LOG_LEVEL) - - download_button_id = "downloadDonnees" - - try: - # Wait for either element to appear - wait = WebDriverWait(driver, timeout) - result = wait.until( - lambda d: ( - d.find_element(By.ID, download_button_id) - if d.find_elements(By.ID, download_button_id) - else None - ) - or ( - d.find_element(By.CLASS_NAME, "alert") - if d.find_elements(By.CLASS_NAME, "alert") - else None - ) - ) - if result.text: - logger.debug(result.text) - - # Determine which one appeared - if result.get_attribute("id") == download_button_id: - nb_results = ( - driver.find_element(By.ID, "content") - .find_element(By.CLASS_NAME, "full") - .find_element(By.TAG_NAME, "h2") - .find_element(By.TAG_NAME, "strong") - .text.strip() - ) - nb_results = int(nb_results) - # Résulats de recherche OK - result.click() - sleep(2) - return "download", nb_results - elif "préciser" in result: - logger.info("too many results") - return "too_many", 0 - elif "Aucun" in result: - logger.info("no result") - return "no_result", 0 - else: - logger.info("Ni téléchargement, ni erreur...") - return None, 0 # Should not happen - - except TimeoutException: - logger.error("[Timeout] Ni bouton ni erreur dans le temps imparti...") - return "timeout", 0 - except Exception as e: - logger.error(f"[Error] Unexpected error while waiting: {e}") - return None, 0 diff --git a/src/tasks/scrap/__init__.py b/src/tasks/scrap/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/tasks/scrap/aws.py b/src/tasks/scrap/aws.py new file mode 100644 index 0000000..187ac03 --- /dev/null +++ b/src/tasks/scrap/aws.py @@ -0,0 +1,256 @@ +import calendar +import json +import re +import time +from datetime import date, timedelta +from pathlib import Path +from time import sleep + +import httpx +from prefect import task +from selenium import webdriver +from selenium.common import TimeoutException +from selenium.webdriver.common.by import By +from selenium.webdriver.firefox.options import Options +from selenium.webdriver.support.wait import WebDriverWait + +from config import LOG_LEVEL +from tasks.publish import publish_scrap_to_datagouv +from tasks.utils import get_logger + + +@task() +def scrap_aws_month(year: str = None, month: str = None, dist_dir: Path = None): + logger = get_logger(level=LOG_LEVEL) + + options = Options() + options.add_argument("--headless") + options.set_preference("browser.download.folderList", 2) + options.set_preference("browser.download.manager.showWhenStarting", False) + options.set_preference("browser.download.dir", str(dist_dir)) + + driver = webdriver.Firefox(options=options) + driver.implicitly_wait(10) # secondes + + end_date = start_date = date(int(year), int(month), 1) + base_duration = timedelta(days=3) + nb_days_in_month = calendar.monthrange(start_date.year, start_date.month)[1] + last_month_day = start_date + timedelta(days=nb_days_in_month - 1) + marches_month = [] + replacements = httpx.get( + "https://www.data.gouv.fr/api/1/datasets/r/3bdd5a64-c28e-4c6a-84fd-5a28bcaa53e9", + follow_redirects=True, + ).json() + + retry_count = 0 + + while end_date < last_month_day: + # On évite de boucler sans fin + if retry_count > 3: + retry_count = 0 + start_date = end_date + timedelta(days=1) + continue + + start_date_str = start_date.isoformat() + end_date = start_date + base_duration + + if end_date > last_month_day: + end_date = last_month_day + + driver.get("https://www.marches-publics.info/Annonces/rechercher") + + def search_form(end_date_: date) -> tuple[date, str, int]: + end_date_str_ = end_date_.isoformat() + sleep(1) + logger.info(f"➡️ {start_date_str} -> {end_date_str_}") + + # Formulaire recherche données essentielles + form = driver.find_element(By.ID, "formRech") + de_radio = form.find_element(By.ID, "typeDE") + de_radio.click() + + # Remplir le formulaire + notif_debut = form.find_element(By.ID, "dateNotifDebut") + notif_debut.clear() + notif_debut.send_keys(start_date_str) + notif_fin = form.find_element(By.ID, "dateNotifFin") + notif_fin.clear() + notif_fin.send_keys(end_date_str_) + sleep(0.1) + form.find_element(By.ID, "sub").click() + sleep(1) + + # Soit le bouton de téléchargement apparaît, soit il y a une erreur parce + # que de trop nombreux résultats sont retournés + result_code_, nb_results_ = wait_for_either_element(driver) + + # End search_form() + return end_date, result_code_, nb_results_ + + end_date, result_code, nb_results = search_form(end_date) + + if result_code == "too_many": + # On réessaie avec moins de résultats + if end_date != start_date: + logger.info("💥 Trop de résultats, on réessaie avec un jour de moins") + end_date = search_form(end_date - timedelta(days=1)) + continue + else: + logger.info("start_date == end_date et trop de résultats, on skip !") + start_date = end_date + timedelta(days=1) + continue + elif result_code == "no_result": + logger.info("👻 Aucun résultat, on skip.") + start_date = end_date + timedelta(days=1) + continue + elif result_code == "timeout": + # On réessaie après 10 secondes + sleep(3) + retry_count += 1 + continue + elif result_code is None: + logger.info("❓ Pas de téléchargement, on skip.") + start_date = end_date + timedelta(days=1) + continue + + end_date_str = end_date.isoformat() + + json_path = dist_dir / "donneesEssentielles.json" + + start_time = time.time() + last_size = 0 + timeout = 10 + downloaded = False + final_json_path = dist_dir / f"{start_date_str}_{end_date_str}.json" + + while time.time() - start_time < timeout and downloaded is False: + if json_path.exists(): + current_size = json_path.stat().st_size + if current_size == last_size and current_size > 0: + sleep(0.1) + json_path.rename(final_json_path) + downloaded = True + last_size = current_size + time.sleep(0.2) + + if final_json_path.exists(): + with open(final_json_path, "r") as f: + json_text = f.read() + try: + marches = json.loads(json_text)["marches"] + except json.decoder.JSONDecodeError: + logger.debug("Le décodage JSON a échoué, tentative de correction...") + + def fix_unescaped_quotes_in_objet(text): + """Générée avec l'aide de ChatGPT (GPT-4o)""" + + # Match the value of "objet" up to the "montant" key + def replacer(match): + # Escape quotes that are not already escaped + fixed_objet = re.sub(r'(? 0 and isinstance(marches_month, list): + # Format 2022, donc double niveau + dicts = {"marches": {"marche": marches_month}} + json_path = dist_dir / f"aws_{year}-{month}.json" + with open(json_path, "w") as f: + f.write(json.dumps(dicts)) + publish_scrap_to_datagouv(year, month, json_path, "aws") + + # End scrap AWS + + +def wait_for_either_element(driver, timeout=10) -> tuple[str or None, int]: + """ + Attend de voir si le bouton de téléchargement apparaît ou bien le message d'erreur. + Fonction générée en grande partie avec la LLM Euria, développée par Infomaniak + """ + logger = get_logger(level=LOG_LEVEL) + + download_button_id = "downloadDonnees" + + try: + # Wait for either element to appear + wait = WebDriverWait(driver, timeout) + result = wait.until( + lambda d: ( + d.find_element(By.ID, download_button_id) + if d.find_elements(By.ID, download_button_id) + else None + ) + or ( + d.find_element(By.CLASS_NAME, "alert") + if d.find_elements(By.CLASS_NAME, "alert") + else None + ) + ) + if result.text: + logger.debug(result.text) + + # Determine which one appeared + if result.get_attribute("id") == download_button_id: + nb_results = ( + driver.find_element(By.ID, "content") + .find_element(By.CLASS_NAME, "full") + .find_element(By.TAG_NAME, "h2") + .find_element(By.TAG_NAME, "strong") + .text.strip() + ) + nb_results = int(nb_results) + # Résulats de recherche OK + result.click() + sleep(2) + return "download", nb_results + elif "préciser" in result: + logger.info("too many results") + return "too_many", 0 + elif "Aucun" in result: + logger.info("no result") + return "no_result", 0 + else: + logger.info("Ni téléchargement, ni erreur...") + return None, 0 # Should not happen + + except TimeoutException: + logger.error("[Timeout] Ni bouton ni erreur dans le temps imparti...") + return "timeout", 0 + except Exception as e: + logger.error(f"[Error] Unexpected error while waiting: {e}") + return None, 0 diff --git a/src/tasks/scrap/dume.py b/src/tasks/scrap/dume.py new file mode 100644 index 0000000..57c3758 --- /dev/null +++ b/src/tasks/scrap/dume.py @@ -0,0 +1,187 @@ +import calendar +import json +from datetime import date, timedelta +from pathlib import Path +from time import sleep + +import dume_api +import polars as pl + +from config import DIST_DIR, LOG_LEVEL +from tasks.publish import publish_scrap_to_datagouv +from tasks.utils import get_logger + + +def scrap_dume_month(year: str = None, month: str = None, dist_dir: Path = None): + logger = get_logger(level=LOG_LEVEL) + + end_date = start_date = date(int(year), int(month), 1) + base_duration = timedelta(days=0) + nb_days_in_month = calendar.monthrange(start_date.year, start_date.month)[1] + last_month_day = start_date + timedelta(days=nb_days_in_month - 1) + marches_month = [] + decp_uids = set( + pl.read_parquet(DIST_DIR / "decp.parquet", columns=["uid"])["uid"].to_list() + ) + + retry_count = 0 + + while end_date < last_month_day: + # On évite de boucler sans fin + if retry_count > 3: + logger.error("Trop d'essais, start_date + 1") + retry_count = 0 + start_date = end_date + timedelta(days=1) + continue + + start_date_str = start_date.isoformat() + + if retry_count == 0: + end_date = start_date + base_duration + + if end_date > last_month_day: + end_date = last_month_day + + end_date_str = end_date.isoformat() + + logger.info(f"➡️ {start_date_str} -> {end_date_str}") + rows, nb_results = get_dume_rows(start_date_str, end_date_str) + + marches = [] + if nb_results > 0: + marches = dume_to_decp(rows) + + marches_month.extend(marches) + + # On passe aux jours suivants + start_date = end_date + timedelta(days=1) + retry_count = 0 + continue + + if len(marches_month) > 0: + # Format 2022, donc double niveau + dicts = {"marches": {"marche": marches_month}} + json_path = dist_dir / f"dume_{year}-{month}.json" + with open(json_path, "w") as f: + f.write(json.dumps(dicts, indent=2)) + logger.info(str(len(marches_month)) + f" marchés pour le mois ({month}/{year})") + + get_uid_stats(marches_month, decp_uids) + publish_scrap_to_datagouv(year, month, json_path, "dume") + + +def dume_to_decp(rows): + new_rows = [] + + def get_titulaires(titulaires): + if isinstance(titulaires, list) and len(titulaires) > 0: + return [{"titulaire": titulaire} for titulaire in d.get("titulaires")] + return [] + + for r in rows: + d = r.get("donneesMP") + new_row = { + "uid": d.get("idAcheteur") + r.get("id"), + "id": r.get("id"), + "objet": r.get("objet"), + "nature": r.get("nature"), + "procedure": r.get("procedure"), + "dureeMois": r.get("dureeMois"), + "datePublicationDonnees": r.get("datePublicationDonnees"), + "acheteur": { + "id": d.get("idAcheteur"), + }, + "techniques": [ + {"technique": [d.get("technique")]}, + ], + "modalitesExecution": [ + {"modaliteExecution": [d.get("modaliteExecution")]}, + ], + "idAccordCadre": d.get("idAccordCadre"), + "codeCPV": d.get("codeCPV"), + "lieuExecution": { + "code": d.get("lieuExecutionCode"), + "typeCode": d.get("lieuExecutionTypeCode"), + }, + "dateNotification": d.get("dateNotification"), + "marchesInnovant": d.get("marchesInnovant"), + "attributionAvance": d.get("attributionAvance"), + "tauxAvance": d.get("tauxAvance"), + "origineUE": d.get("origineUE"), + "origineFrance": d.get("origineFrance"), + "ccag": d.get("ccag"), + "offresRecues": d.get("offresRecues"), + "montant": d.get("montant"), + "formePrix": d.get("formePrix"), + "typesPrix": {"typePrix": [d.get("typePrix")]}, + "typeGroupementOperateurs": d.get("typeGroupementOperateurs"), + "sousTraitanceDeclaree": d.get("sousTraitanceDeclaree"), + "titulaires": get_titulaires(d.get("titulaires")), + "modifications": r.get("modifications"), + "source": "scrap_aife_dume", + } + new_rows.append(new_row) + + return new_rows + + +def get_dume_rows(start_date_str: str, end_date_str: str) -> tuple[list[dict], int]: + procedures = [ + "Marché passé sans publicité ni mise en concurrence préalable", + "Appel d'offres ouvert", + "Appel d'offres restreint", + "Procédure adaptée", + "Procédure avec négociation", + "Dialogue compétitif", + ] + + logger = get_logger(level=LOG_LEVEL) + + _rows = [] + + for procedure in procedures: + try: + new_rows = dume_api.get_contracts( + date_start=start_date_str, + date_end=end_date_str, + type_de="MP", + procedure=procedure, + partition_map={ + "nature": [ + "Marché", + "Marché de partenariat", + "Accord-cadre", + "Marché subséquent", + ] + }, + ) + sleep(0.1) + _rows.extend(new_rows) + logger.debug(procedure + " " + str(len(new_rows)) + f" rows ({len(_rows)})") + except RuntimeError: + logger.error(f"Trop de lignes, on passe ({procedure})") + + _nb_results = len(_rows) + logger.debug(str(_nb_results) + " rows pour la période") + + return _rows, _nb_results + + +def get_uid_stats(marches, decp_uids): + logger = get_logger(level=LOG_LEVEL) + + dume_uids = {marche["uid"] for marche in marches} + + in_decp_uids = dume_uids.intersection(decp_uids) + + logger.info( + str(len(dume_uids)) + " identifiants uniques dans le DUME pour cette période" + ) + logger.info( + str(len(in_decp_uids)) + + " identifiants uniques dans le DUME pour cette période présents dans les DECP consolidées" + ) + logger.info( + str(round(len(in_decp_uids) / len(dume_uids) * 100, 2)) + + " % des identifiants sur cette période sont présents dans les DECP consolidées tabulaires" + ) diff --git a/src/tasks/scrap/marches_securises.py b/src/tasks/scrap/marches_securises.py new file mode 100644 index 0000000..fee6316 --- /dev/null +++ b/src/tasks/scrap/marches_securises.py @@ -0,0 +1,106 @@ +import json +from pathlib import Path +from time import sleep + +import httpx +from bs4 import BeautifulSoup +from prefect import task + +from config import LOG_LEVEL +from tasks.publish import publish_scrap_to_datagouv +from tasks.utils import get_logger + + +def get_html(url: str, client: httpx.Client) -> str or None: + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:128.0) Gecko/20100101 Firefox/128.0", + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/png,image/svg+xml,*/*;q=0.8", + "Connection": "keep-alive", + } + logger = get_logger(level=LOG_LEVEL) + + def get_response() -> httpx.Response: + return client.get(url, timeout=timeout, headers=headers).raise_for_status() + + timeout = httpx.Timeout(20.0, connect=60.0, pool=20.0, read=20.0) + try: + response = get_response() + except (httpx.ConnectError, httpx.ReadTimeout, httpx.HTTPStatusError): + logger.debug("3s break and retrying...") + sleep(3) + try: + response = get_response() + except (httpx.ConnectError, httpx.ReadTimeout, httpx.HTTPStatusError): + logger.error("Skipped") + return None + html = response.text + return html + + +def get_json_marches_securises(url: str, client: httpx.Client) -> dict or None: + json_html_page = get_html(url, client) + logger = get_logger(level=LOG_LEVEL) + + sleep(0.1) + if json_html_page: + json_html_page = ( + json_html_page.replace("", "") + "" + ) + else: + logger.warning("json_html_page is None, skipping...") + return None + json_html_page_soup = BeautifulSoup(json_html_page, "html.parser") + try: + decp_json = json.loads(json_html_page_soup.find("body").string) + except Exception as e: + logger.info(json_html_page) + logger.info(e) + return None + return decp_json + + +@task(log_prints=True) +def scrap_marches_securises_month(year: str, month: str, dist_dir: Path): + logger = get_logger(level=LOG_LEVEL) + + marches = [] + page = 1 + with httpx.Client() as client: + while True: + search_url = ( + f"https://www.marches-securises.fr/entreprise/?module=liste_donnees_essentielles&page={str(page)}&siret_pa=&siret_pa1=&date_deb={year}-{month}-01&date_fin={year}-{month}-31&date_deb_ms={year}-{month}-01&date_fin_ms={year}-{month}-31&ref_ume=&cpv_et=&type_procedure=&type_marche=&objet=&rs_oe=&dep_liste=&ctrl_key=aWwwS1pLUlFzejBOYitCWEZzZTEzZz09&text=&donnees_essentielles=1&search=" + f"table_ms&" + ) + + def parse_result_page(): + html_result_page = get_html(search_url, client) + if html_result_page is None: + return [] + soup = BeautifulSoup(html_result_page, "html.parser") + result_div = soup.find("div", attrs={"id": "liste_consultations"}) + logger.info(f"Year: {year}, Month: {month}, Page: {str(page)}") + return result_div.find_all( + "a", attrs={"title": "Télécharger au format Json"} + ) + + try: + json_links = parse_result_page() + except AttributeError: + sleep(1) + logger.info("Retrying result page download and parsing...") + json_links = parse_result_page() + + if not json_links: + break + else: + page += 1 + for json_link in json_links: + json_href = "https://www.marches-securises.fr" + json_link["href"] + decp_json = get_json_marches_securises(json_href, client) + marches.append(decp_json) + if len(marches) > 0: + dicts = {"marches": marches} + json_path = dist_dir / f"marches-securises_{year}-{month}.json" + with open(json_path, "w") as f: + f.write(json.dumps(dicts)) + publish_scrap_to_datagouv(year, month, json_path, "marches-securises.fr") From 63ebfe67c6a5f7ff344e949a5739414bc840da6d Mon Sep 17 00:00:00 2001 From: Colin Maudry Date: Mon, 29 Dec 2025 15:25:22 +0100 Subject: [PATCH 17/24] Ajout du scrap klekoon #71 --- src/flows/scrap.py | 15 ++- src/tasks/scrap/klekoon.py | 217 +++++++++++++++++++++++++++++++++++++ 2 files changed, 229 insertions(+), 3 deletions(-) create mode 100644 src/tasks/scrap/klekoon.py diff --git a/src/flows/scrap.py b/src/flows/scrap.py index 487348e..2d8978f 100644 --- a/src/flows/scrap.py +++ b/src/flows/scrap.py @@ -11,22 +11,26 @@ SCRAPING_MODE, SCRAPING_TARGET, ) -from src.tasks.utils import get_logger +from src.tasks.utils import get_logger, print_all_config from tasks.scrap.aws import scrap_aws_month from tasks.scrap.dume import scrap_dume_month +from tasks.scrap.klekoon import scrap_klekoon from tasks.scrap.marches_securises import scrap_marches_securises_month @flow(log_prints=True) def scrap(target: str, mode: str, month=None, year=None): logger = get_logger(level=LOG_LEVEL) + + print_all_config() + # Remise à zéro du dossier dist dist_dir: Path = DIST_DIR / target if dist_dir.exists(): - logger.debug(f"Suppression de {dist_dir}...") + logger.info(f"Suppression de {dist_dir}...") rmtree(dist_dir) - dist_dir.mkdir(parents=True) + dist_dir.mkdir(parents=True, exist_ok=True) # Sélection de la fonction de scraping en fonction de target if target == "aws": @@ -35,6 +39,11 @@ def scrap(target: str, mode: str, month=None, year=None): scrap_target_month = scrap_marches_securises_month elif target == "dume": scrap_target_month = scrap_dume_month + elif target == "klekoon": + # Klekoon présent ses données par acheteur et non de manière temporelle + # donc on télécharge tout à chaque fois + scrap_klekoon(dist_dir) + return else: logger.error("Quel target ?") raise ValueError diff --git a/src/tasks/scrap/klekoon.py b/src/tasks/scrap/klekoon.py new file mode 100644 index 0000000..90d6a1b --- /dev/null +++ b/src/tasks/scrap/klekoon.py @@ -0,0 +1,217 @@ +import json +import os +import subprocess +from pathlib import Path +from time import sleep + +import httpx +import polars as pl +from httpx import Client +from lxml import etree + +from src.config import LOG_LEVEL +from src.tasks.publish import publish_scrap_to_datagouv +from src.tasks.utils import get_logger + +DCATS_URL = "https://www.klekoon.com/declaration-profil-acheteur" +CLIENT = Client( + timeout=20, headers={"User-Agent": "decp.info", "Connection": "keep-alive"} +) + + +def get_dcats(dist_dir) -> list: + logger = get_logger(level="DEBUG") + + df_dcats = get_dcats_df(dist_dir) + dcat_paths = [] + dcats_url = df_dcats["urlDCAT"].unique().to_list() + dcat_dir = dist_dir / "dcats" + + nb_dcats = len(dcats_url) + logger.debug(f"{nb_dcats} dcats") + + if dcat_dir.exists(): + dcat_paths = os.listdir(dcat_dir) + dcat_paths = [dcat_dir / path for path in dcat_paths] + return dcat_paths + os.makedirs(dcat_dir, exist_ok=True) + + for i, url in enumerate(dcats_url): + sleep(0.1) + logger.debug(f"{i}/{nb_dcats} {url} (download)") + siret = url.split("/")[-1] + path = Path(f"{dcat_dir}/{siret}.xml") + with open(path, "w") as f: + response = CLIENT.get(url).raise_for_status() + text = response.text + text: str = text.replace(' encoding="utf-16"', "", 1) + f.write(text) + dcat_paths.append(path) + + return dcat_paths + + +def scrap_klekoon(dist_dir: Path): + logger = get_logger(level=LOG_LEVEL) + + logger.info("Téléchargement des fichiers DCAT...") + dcat_paths = get_dcats(dist_dir) + + logger.info("Parsing des DCATs pour récupérer les URLs de marchés...") + urls = extract_urls_from_dcat(dcat_paths) + + logger.info("Téléchargement des marchés JSON...") + marches_per_month = get_marches_json(urls, dist_dir) + + logger.info("Concaténation des JSON par mois...") + json_months_paths = concat_json_per_month(marches_per_month, dist_dir) + + logger.info("Publication...") + for path in json_months_paths: + year_month = path.split("/")[-1].split(".")[0].split("_")[1] + year, month = year_month.split("-") + + publish_scrap_to_datagouv( + year=year, month=month, file_path=path, target="klekoon" + ) + + +def extract_urls_from_dcat(dcat_paths: list): + logger = get_logger(level=LOG_LEVEL) + len_dcat_paths = len(dcat_paths) + + marches_urls = [] + + namespaces = { + "rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#", + "dcat": "http://www.w3.org/ns/dcat#", + "dct": "http://purl.org/dc/terms/", + } + + for i, dcat_path in enumerate(dcat_paths): + logger.debug(f"{i}/{len_dcat_paths} {dcat_path} (parsing)") + + urls = [] + + try: + tree: etree._ElementTree = etree.parse(dcat_path) + datasets = tree.findall(".//dcat:dataset", namespaces=namespaces) + + for dataset in datasets: + ressources = dataset.findall( + ".//{http://www.w3.org/2000/01/rdf-schema#}ressource", + namespaces=namespaces, + ) + month_publication: etree._Element = dataset.find( + "dct:issued", namespaces=namespaces + ) + url = "" + + for ressource in ressources: + url: str = ressource.text.strip() + if url and "/json/" in url: + break + + try: + assert url != "" + except AssertionError: + continue + + if month_publication is not None: + month_publication: str = month_publication.text.strip()[:7] + else: + try: + month_publication: str = get_month_publication(url) + except httpx.HTTPStatusError: + logger.error(f"URL {url} 404") + continue + + urls.append({"url": url, "month": month_publication}) + + logger.debug(f"{i}/{len_dcat_paths} {len(urls)} marchés") + marches_urls.extend(urls) + + except (AssertionError, etree.XMLSyntaxError) as e: + logger.error(f"{dcat_path}: {e}") + + return marches_urls + + +def get_marches_json(urls, dist_dir): + logger = get_logger(level=LOG_LEVEL) + + marches_per_month = {} + nb_urls = len(urls) + urls = sorted(urls, key=lambda x: x["month"]) + + for i, url in enumerate(urls): + sleep(0.1) + logger.debug(f"{i}/{nb_urls} {url}") + month = url["month"] + marche_url = url["url"] + + os.makedirs(f"{dist_dir}/{month}", exist_ok=True) + + if month not in marches_per_month: + marches_per_month[month] = [] + + id_marche = marche_url.rsplit("/")[-1] + path_marche = f"{dist_dir}/{month}/{id_marche}.json" + with open(path_marche, "w") as f: + try: + response = CLIENT.get(marche_url).raise_for_status().json() + json.dump(response, f) + except httpx.HTTPStatusError as e: + logger.error(f"{marche_url}: {e}") + continue + + marches_per_month[month].append(path_marche) + + return marches_per_month + + +def concat_json_per_month(marches_per_month, dist_dir): + json_months_paths = [] + for month in marches_per_month: + marches_json_month = [] + + for marche in marches_per_month[month]: + with open(marche, "rb") as f: + marche_json = json.load(f) + del marche_json["$schema"] + marche_json["source"] = "scrap_klekoon" + marches_json_month.append(marche_json) + + marches_json_month = {"marches": marches_json_month} + path = f"{dist_dir}/klekoon_{month}.json" + with open(path, "w") as f: + json.dump(marches_json_month, f, indent=2) + json_months_paths.append(path) + + return json_months_paths + + +def get_dcats_df(dist_dir): + # Exécuter curl sans décoder automatiquement + path = f"{dist_dir}/dcat.csv" + if not (os.path.exists(path)): + result = subprocess.run( + ["curl", "-s", "-L", DCATS_URL], capture_output=True, check=True + ) + # Décoder manuellement en ISO-8859-1 (ou Windows-1252) + content = result.stdout.decode("iso-8859-1") + content = content.replace(";coordonnnees", ";coordonnnees\n") + + with open(path := f"{dist_dir}/dcat.csv", "w") as f: + f.write(content) + + # Lire avec Polars + df = pl.read_csv(path, separator=";", encoding="iso-8859-1") + return df + + +def get_month_publication(url) -> str: + marche_json = CLIENT.get(url).raise_for_status().json() + date_publication = marche_json["datePublicationDonnees"] + month_publication = date_publication[:7] + return month_publication From aa0117333988fe251ea2fc716d8f80f8d89d1a18 Mon Sep 17 00:00:00 2001 From: Colin Maudry Date: Mon, 29 Dec 2025 15:25:38 +0100 Subject: [PATCH 18/24] Ajout du scrap klekoon #71 --- src/tasks/publish.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/tasks/publish.py b/src/tasks/publish.py index c6bfbf2..6529548 100644 --- a/src/tasks/publish.py +++ b/src/tasks/publish.py @@ -107,6 +107,7 @@ def publish_scrap_to_datagouv(year: str, month: str, file_path, target): "aws": "68caf6b135f19236a4f37a32", "marches-securises.fr": "68ebb48dd708fdb2d7c15bff", "dume": "694ff7a98210456475f98aca", + "klekoon": "6952899077f982c9a2373ede", } logger = get_logger(level=LOG_LEVEL) From 5aa670a1d0044d0bb056363b129fdd6e7b388f52 Mon Sep 17 00:00:00 2001 From: Colin Maudry Date: Mon, 29 Dec 2025 15:26:43 +0100 Subject: [PATCH 19/24] =?UTF-8?q?Fix=20log=20level,=20ajout=20du=20log=20l?= =?UTF-8?q?evel=20=C3=A0=20ALL=5FCONFIG?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/config.py | 1 + src/flows/decp_processing.py | 3 +-- src/tasks/utils.py | 8 ++++++-- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/config.py b/src/config.py index 5530afb..9a40ca1 100644 --- a/src/config.py +++ b/src/config.py @@ -33,6 +33,7 @@ def make_path_from_env(env: str, alternative_path: Path) -> Path: # Niveau des logs LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO") +ALL_CONFIG["LOG_LEVEL"] = LOG_LEVEL # Nombre maximal de workers utilisables par Prefect. Défaut : 16 MAX_PREFECT_WORKERS = int(os.getenv("MAX_PREFECT_WORKERS", 4)) diff --git a/src/flows/decp_processing.py b/src/flows/decp_processing.py index 4f8198f..692d7eb 100644 --- a/src/flows/decp_processing.py +++ b/src/flows/decp_processing.py @@ -10,7 +10,6 @@ from prefect_email import EmailServerCredentials, email_send_message from src.config import ( - ALL_CONFIG, BASE_DF_COLUMNS, DATE_NOW, DECP_PROCESSING_PUBLISH, @@ -51,7 +50,7 @@ def decp_processing(enable_cache_removal: bool = True): logger.info("🚀 Début du flow decp-processing") - print_all_config(ALL_CONFIG) + print_all_config() logger.info("Liste de toutes les ressources des datasets...") resources: list[dict] = list_resources(TRACKED_DATASETS) diff --git a/src/tasks/utils.py b/src/tasks/utils.py index b14fad1..8456ca0 100644 --- a/src/tasks/utils.py +++ b/src/tasks/utils.py @@ -12,6 +12,7 @@ from prefect.logging import get_run_logger from src.config import ( + ALL_CONFIG, CACHE_EXPIRATION_TIME_HOURS, DATE_NOW, DIST_DIR, @@ -343,8 +344,9 @@ def check_parquet_file(path) -> bool: return False -def print_all_config(all_config): +def print_all_config(): logger = get_logger(level=LOG_LEVEL) + all_config = ALL_CONFIG msg = "" for k, v in sorted(all_config.items()): @@ -354,6 +356,8 @@ def print_all_config(all_config): def get_logger(level: str) -> logging.Logger: try: - return get_run_logger(level=level) + logger = get_run_logger() + logger.setLevel(level) + return logger except MissingContextError: return logging.Logger(name="Fallback logger", level=level) From 4c57278ae5a4ef743c49cad3c503b784fb833156 Mon Sep 17 00:00:00 2001 From: Colin Maudry Date: Mon, 29 Dec 2025 15:26:56 +0100 Subject: [PATCH 20/24] Ajout du dataset scrap klekoon #71 --- reference/source_datasets.json | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/reference/source_datasets.json b/reference/source_datasets.json index 4dfd212..126762e 100644 --- a/reference/source_datasets.json +++ b/reference/source_datasets.json @@ -316,5 +316,11 @@ "name": "Les Personnes placées sous main de justice - IDF 2024", "code": "ppsmj", "owner_org_name": "Yael Siksik" + }, + { + "id": "6952899077f982c9a2373ede", + "name": "Données essentielles de la commande publique (DECP) de Klekoon", + "code": "scrap_klekoon", + "owner_org_name": "Colin Maudry" } ] From 812b99deb34e54ff749b561c3e8087ea9177e3a3 Mon Sep 17 00:00:00 2001 From: Colin Maudry Date: Mon, 29 Dec 2025 15:29:55 +0100 Subject: [PATCH 21/24] Changelog #71 --- CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index dc40285..a2288d9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,8 @@ ### 2.7.0 - Remplacement des guillemets simples par des apostrophes dans "objet" -- Ajout des données de l'API DUME (code source `scrap_aife_dume`) +- Ajout des données de l'API DUME (code source `scrap_aife_dume`) ([#144](https://github.com/ColinMaudry/decp-processing/issues/144)) +- Ajout des données du profil d'acheteur Klekoon (code source `scrap_klekoon`) ([#71](https://github.com/ColinMaudry/decp-processing/issues/71)) #### 2.6.4 2025-12-19 From 706ffcd0abbb3ff16002555ad0e2f3658cb784bc Mon Sep 17 00:00:00 2001 From: Colin Maudry Date: Mon, 29 Dec 2025 16:49:15 +0100 Subject: [PATCH 22/24] Bump version --- README.md | 2 +- pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 448161f..fb65338 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # DECP processing -> version 2.6.3 ([notes de version](https://github.com/ColinMaudry/decp-processing/blob/main/CHANGELOG.md)) +> version 2.7.0 ([notes de version](https://github.com/ColinMaudry/decp-processing/blob/main/CHANGELOG.md)) Projet de traitement et de publication de meilleures données sur les marchés publics attribués en France. Vous pouvez consulter, filtrer et télécharger ces données sur le site [decp.info](https://decp.info). Enfin la section [À propos](https://decp.info/a-propos) décrit les objectifs du projet et regroupe toutes les informations clés. diff --git a/pyproject.toml b/pyproject.toml index 60adaad..dbf83e2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [project] name = "decp-processing" description = "Traitement des données des marchés publics français." -version = "2.6.3" +version = "2.7.0" requires-python = ">= 3.9" authors = [ { name = "Colin Maudry", email = "colin+decp@maudry.com" } From 94e4dad0c99cd5e161022c1fccc50ab1e7e0904d Mon Sep 17 00:00:00 2001 From: Colin Maudry Date: Mon, 29 Dec 2025 16:55:01 +0100 Subject: [PATCH 23/24] Bump polars version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index dbf83e2..d460c0a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,7 +9,7 @@ authors = [ dependencies = [ "python-dotenv", "pandas", # nécessaire pour l'écriture en base de données - "polars==1.35.2", + "polars==1.36.1", "pyarrow", "frictionless", "ipykernel", From 550065ac7db60a615c5cd6efa4412e7285e05f8f Mon Sep 17 00:00:00 2001 From: Colin Maudry Date: Mon, 29 Dec 2025 17:04:06 +0100 Subject: [PATCH 24/24] =?UTF-8?q?M=C3=A0j=20tests=20(objet)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/tasks/clean.py | 6 ++++-- tests/test_clean.py | 2 ++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/tasks/clean.py b/src/tasks/clean.py index 2f8b941..b494ac5 100644 --- a/src/tasks/clean.py +++ b/src/tasks/clean.py @@ -141,10 +141,12 @@ def clean_decp(lf: pl.LazyFrame, decp_format: DecpFormat) -> pl.LazyFrame: ) # NC - lf = lf.with_columns(pl.col(pl.Utf8).replace("NC", None)) + lf = lf.with_columns(pl.col(pl.Utf8).replace("NC", None).name.keep()) # Remplacement des single quotes qui servent d'apostrophes - lf = lf.with_columns(pl.col("objet").str.replace_all(r"(\w)'(\w)", "$1’$2")) + lf = lf.with_columns( + pl.col("objet").str.replace_all(r"(\w)'(\w)", "$1’$2").alias("objet") + ) # Correction des datatypes lf = fix_data_types(lf) diff --git a/tests/test_clean.py b/tests/test_clean.py index bb485f2..16ecb20 100644 --- a/tests/test_clean.py +++ b/tests/test_clean.py @@ -213,6 +213,7 @@ def test_clean_decp(): "id": ["id.1", "id/2", ""], "acheteur_id": ["ach1", "ach2", ""], "acheteur.id": ["", "ach2", ""], + "objet": "Avec des 'apo'strophe's", "montant": ["1000", "1000000000000.00", "2000"], "datePublicationDonnees": ["2023-01-01", "0002-11-30", "2023-01-02"], "dateNotification": ["2023-01-01", "2023-01-01", "2023-01-01"], @@ -284,6 +285,7 @@ def test_clean_decp(): assert df_result["considerationsEnvironnementales"][0] == "Sans objet" assert df_result["ccag"][0] == "Sans objet" assert df_result["typeGroupement"][0] == "Sans objet" + assert df_result["objet"][1] == "Avec des 'apo’strophe’s" # Check nature replacement assert df_result["nature"][0] == "Marché subséquent"