diff --git a/CHANGELOG.md b/CHANGELOG.md index 4c1873c..a2288d9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,13 @@ +### 2.7.0 + +- Remplacement des guillemets simples par des apostrophes dans "objet" +- Ajout des données de l'API DUME (code source `scrap_aife_dume`) ([#144](https://github.com/ColinMaudry/decp-processing/issues/144)) +- Ajout des données du profil d'acheteur Klekoon (code source `scrap_klekoon`) ([#71](https://github.com/ColinMaudry/decp-processing/issues/71)) + #### 2.6.4 2025-12-19 - Tri et numérotation des modifications après la concaténation plutôt que par ressource, pour réduire le nombre de doublons ([#156](https://github.com/ColinMaudry/decp-processing/issues/156)) -- Utilisation du logger de prefect plûtot que `log_prints=True` +- Utilisation du logger de prefect plûtot que `log_prints=True` ([#94](https://github.com/ColinMaudry/decp-processing/issues/94)) #### 2.6.3 2025-12-16 diff --git a/README.md b/README.md index acf4ed0..fb65338 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # DECP processing -> version 2.6.3 ([notes de version](https://github.com/ColinMaudry/decp-processing/blob/main/CHANGELOG.md)) +> version 2.7.0 ([notes de version](https://github.com/ColinMaudry/decp-processing/blob/main/CHANGELOG.md)) Projet de traitement et de publication de meilleures données sur les marchés publics attribués en France. Vous pouvez consulter, filtrer et télécharger ces données sur le site [decp.info](https://decp.info). Enfin la section [À propos](https://decp.info/a-propos) décrit les objectifs du projet et regroupe toutes les informations clés. @@ -163,3 +163,4 @@ pytest tests/test_main.py - [FranckMaseo](https://github.com/frankmaseo) - [Thomas Louf](https://github.com/tlouf) - [vico4445](https://github.com/vico4445) +- [imanuch](https://github.com/imanuch) diff --git a/pyproject.toml b/pyproject.toml index 90135f9..d460c0a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [project] name = "decp-processing" description = "Traitement des données des marchés publics français." -version = "2.6.3" +version = "2.7.0" requires-python = ">= 3.9" authors = [ { name = "Colin Maudry", email = "colin+decp@maudry.com" } @@ -9,7 +9,7 @@ authors = [ dependencies = [ "python-dotenv", "pandas", # nécessaire pour l'écriture en base de données - "polars==1.35.2", + "polars==1.36.1", "pyarrow", "frictionless", "ipykernel", @@ -23,7 +23,8 @@ dependencies = [ "selenium", "polars_ds", "scikit-learn", - "tenacity" + "tenacity", + "dume_api" ] [project.optional-dependencies] diff --git a/reference/schema_base.json b/reference/schema_base.json index 5532ede..1a6b9e7 100644 --- a/reference/schema_base.json +++ b/reference/schema_base.json @@ -81,7 +81,7 @@ "type": "string", "name": "procedure", "title": "Procédure", - "description": "Le type de procédure utilisé pour le marché public. Prodcédure négociée ouverte, Procédure non négociée ouverte, Procédure négociée restreinte, Procédure non négociée restreinte.", + "description": "Le type de procédure utilisé pour le marché public. Procédure négociée ouverte, Procédure non négociée ouverte, Procédure négociée restreinte, Procédure non négociée restreinte.", "short_title": null }, { diff --git a/reference/source_datasets.json b/reference/source_datasets.json index 8b77f8d..126762e 100644 --- a/reference/source_datasets.json +++ b/reference/source_datasets.json @@ -257,6 +257,12 @@ "owner_org_name": "Agence pour l'Informatique Financière de l'Etat", "code": "aife_dume" }, + { + "id": "694ff7a98210456475f98aca", + "name": "Données essentielles de la commande publique (DECP) de l'API DUME (AIFE)", + "owner_org_name": "Colin Maudry", + "code": "scrap_aife_dume" + }, { "id": "68ebb48dd708fdb2d7c15bff", "name": "Données des marchés publics de marches-securises.fr", @@ -310,5 +316,11 @@ "name": "Les Personnes placées sous main de justice - IDF 2024", "code": "ppsmj", "owner_org_name": "Yael Siksik" + }, + { + "id": "6952899077f982c9a2373ede", + "name": "Données essentielles de la commande publique (DECP) de Klekoon", + "code": "scrap_klekoon", + "owner_org_name": "Colin Maudry" } ] diff --git a/run_flow.py b/run_flow.py index 2cec406..619c086 100644 --- a/run_flow.py +++ b/run_flow.py @@ -1,5 +1,6 @@ import sys +from src.config import SCRAPING_MODE, SCRAPING_MONTH, SCRAPING_TARGET, SCRAPING_YEAR from src.flows.decp_processing import decp_processing from src.flows.get_cog import get_cog from src.flows.scrap import scrap @@ -28,4 +29,9 @@ if func_name != "scrap": FUNCTIONS[func_name]() else: - scrap(target="aws", mode="all") + scrap( + mode=SCRAPING_MODE, + target=SCRAPING_TARGET, + month=SCRAPING_MONTH, + year=SCRAPING_YEAR, + ) diff --git a/src/config.py b/src/config.py index 2866760..9a40ca1 100644 --- a/src/config.py +++ b/src/config.py @@ -33,6 +33,7 @@ def make_path_from_env(env: str, alternative_path: Path) -> Path: # Niveau des logs LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO") +ALL_CONFIG["LOG_LEVEL"] = LOG_LEVEL # Nombre maximal de workers utilisables par Prefect. Défaut : 16 MAX_PREFECT_WORKERS = int(os.getenv("MAX_PREFECT_WORKERS", 4)) @@ -132,6 +133,14 @@ def make_sirene_data_dir(sirene_data_parent_dir) -> Path: SCRAPING_TARGET = os.getenv("SCRAPING_TARGET") ALL_CONFIG["SCRAPING_TARGET"] = SCRAPING_TARGET +# Year (année cible pour le scraping) +SCRAPING_YEAR = os.getenv("SCRAPING_YEAR") +ALL_CONFIG["SCRAPING_YEAR"] = SCRAPING_YEAR + +# Month (mois cible pour le scraping) +SCRAPING_MONTH = os.getenv("SCRAPING_MONTH") +ALL_CONFIG["SCRAPING_MONTH"] = SCRAPING_MONTH + # Lecture ou non des ressource en cache DECP_USE_CACHE = os.getenv("DECP_USE_CACHE", "false").lower() == "true" @@ -196,6 +205,13 @@ def make_sirene_data_dir(sirene_data_parent_dir) -> Path: SOLO_DATASET = os.getenv("SOLO_DATASET", "") ALL_CONFIG["SOLO_DATASET"] = SOLO_DATASET +# Acheteurs absents de la base SIRENE (pour raisons de sécurité ou autre) +# Format: SIRET -> {"nom": "...", ...} +# Ces données sont utilisées en fallback si l'acheteur n'est pas trouvé dans SIRENE +ACHETEURS_NON_SIRENE = { + "13001536500013": {"nom": "Ministère des Armées"}, +} + with open( make_path_from_env( "DATASETS_REFERENCE_FILEPATH", REFERENCE_DIR / "source_datasets.json" diff --git a/src/deployments.py b/src/deployments.py index 5fd3f81..d6b0b2f 100644 --- a/src/deployments.py +++ b/src/deployments.py @@ -130,6 +130,28 @@ }, ) + flow.from_source( + source=GitRepository( + url="https://github.com/ColinMaudry/decp-processing.git", branch="main" + ), + entrypoint="src/flows/scrap.py:scrap", + ).deploy( + name="scrap-dume", + description="Scraping des données de l'API DUME.", + ignore_warnings=True, + work_pool_name="local", + cron="0 0 * * 2", + job_variables={ + "env": { + "DECP_PROCESSING_PUBLISH": "True", + "DECP_DIST_DIR": "/srv/shared/decp/prod/dist", + "PREFECT_TASKS_REFRESH_CACHE": "False", + "SCRAPING_MODE": "month", + "SCRAPING_TARGET": "dume", + } + }, + ) + flow.from_source( source=GitRepository( url="https://github.com/ColinMaudry/decp-processing.git", branch="main" diff --git a/src/flows/decp_processing.py b/src/flows/decp_processing.py index 6fad7cb..692d7eb 100644 --- a/src/flows/decp_processing.py +++ b/src/flows/decp_processing.py @@ -10,7 +10,6 @@ from prefect_email import EmailServerCredentials, email_send_message from src.config import ( - ALL_CONFIG, BASE_DF_COLUMNS, DATE_NOW, DECP_PROCESSING_PUBLISH, @@ -20,6 +19,7 @@ PREFECT_API_URL, RESOURCE_CACHE_DIR, SIRENE_DATA_DIR, + SOLO_DATASET, TRACKED_DATASETS, ) from src.flows.sirene_preprocess import sirene_preprocess @@ -50,7 +50,7 @@ def decp_processing(enable_cache_removal: bool = True): logger.info("🚀 Début du flow decp-processing") - print_all_config(ALL_CONFIG) + print_all_config() logger.info("Liste de toutes les ressources des datasets...") resources: list[dict] = list_resources(TRACKED_DATASETS) @@ -79,7 +79,11 @@ def decp_processing(enable_cache_removal: bool = True): ) # Afin d'être sûr que je ne publie pas par erreur un jeu de données de test - decp_publish = DECP_PROCESSING_PUBLISH and len(resources_to_process) > 5000 + decp_publish = ( + DECP_PROCESSING_PUBLISH + and len(resources_to_process) > 5000 + and SOLO_DATASET in ["", None] + ) if decp_publish: create_table_artifact( @@ -146,7 +150,7 @@ def decp_processing(enable_cache_removal: bool = True): logger.info("☑️ Fin du flow principal decp_processing.") -@task(retries=2) +@task(retries=2, timeout_seconds=1800) def process_batch( available_parquet_files, batch_size, diff --git a/src/flows/scrap.py b/src/flows/scrap.py index dcd0cd3..2d8978f 100644 --- a/src/flows/scrap.py +++ b/src/flows/scrap.py @@ -11,29 +11,39 @@ SCRAPING_MODE, SCRAPING_TARGET, ) -from src.tasks.scrap import scrap_aws_month, scrap_marches_securises_month -from src.tasks.utils import get_logger +from src.tasks.utils import get_logger, print_all_config +from tasks.scrap.aws import scrap_aws_month +from tasks.scrap.dume import scrap_dume_month +from tasks.scrap.klekoon import scrap_klekoon +from tasks.scrap.marches_securises import scrap_marches_securises_month @flow(log_prints=True) -def scrap(target: str = None, mode: str = None, month=None, year=None): +def scrap(target: str, mode: str, month=None, year=None): logger = get_logger(level=LOG_LEVEL) + + print_all_config() + # Remise à zéro du dossier dist dist_dir: Path = DIST_DIR / target if dist_dir.exists(): - logger.debug(f"Suppression de {dist_dir}...") + logger.info(f"Suppression de {dist_dir}...") rmtree(dist_dir) - else: - dist_dir.mkdir(parents=True) - # Sélection du target - target = target or SCRAPING_TARGET + dist_dir.mkdir(parents=True, exist_ok=True) # Sélection de la fonction de scraping en fonction de target if target == "aws": scrap_target_month = scrap_aws_month elif target == "marches-securises.fr": scrap_target_month = scrap_marches_securises_month + elif target == "dume": + scrap_target_month = scrap_dume_month + elif target == "klekoon": + # Klekoon présent ses données par acheteur et non de manière temporelle + # donc on télécharge tout à chaque fois + scrap_klekoon(dist_dir) + return else: logger.error("Quel target ?") raise ValueError @@ -43,8 +53,15 @@ def scrap(target: str = None, mode: str = None, month=None, year=None): month = month or current_month year = year or current_year - # Sélection du mode - mode = mode or SCRAPING_MODE + # Récapitulatif de la config + # mode et target doivent être passés en paramètre + # les éventuelles env sont injectées via /run_flow.py + # en prod les paramètres sont spécifiées dans le deployment Prefect + logger.info(f""" + Target: {target} (env {SCRAPING_TARGET}) + Mode: {mode} (env {SCRAPING_MODE}) + Year: {year} + Month: {month}""") # Sélection de la plage temporelle if mode == "month": @@ -57,7 +74,7 @@ def scrap(target: str = None, mode: str = None, month=None, year=None): elif mode == "all": current_year = int(current_year) - for year in reversed(range(2018, current_year + 2)): + for year in reversed(range(2018, current_year + 1)): scrap(target=target, mode="year", year=str(year)) else: diff --git a/src/tasks/clean.py b/src/tasks/clean.py index cc5df54..b494ac5 100644 --- a/src/tasks/clean.py +++ b/src/tasks/clean.py @@ -126,6 +126,9 @@ def clean_decp(lf: pl.LazyFrame, decp_format: DecpFormat) -> pl.LazyFrame: .name.keep() ) + # Nettoyage des espaces dans titulaire_id (ex: " 33487372600239") + lf = lf.with_columns(pl.col("titulaire_id").str.strip_chars()) + # Type identifiant = SIRET si vide (marches-securises.fr) lf = lf.with_columns( pl.when( @@ -138,7 +141,12 @@ def clean_decp(lf: pl.LazyFrame, decp_format: DecpFormat) -> pl.LazyFrame: ) # NC - lf = lf.with_columns(pl.col(pl.Utf8).replace("NC", None)) + lf = lf.with_columns(pl.col(pl.Utf8).replace("NC", None).name.keep()) + + # Remplacement des single quotes qui servent d'apostrophes + lf = lf.with_columns( + pl.col("objet").str.replace_all(r"(\w)'(\w)", "$1’$2").alias("objet") + ) # Correction des datatypes lf = fix_data_types(lf) diff --git a/src/tasks/enrich.py b/src/tasks/enrich.py index 10deb06..ead303a 100644 --- a/src/tasks/enrich.py +++ b/src/tasks/enrich.py @@ -1,7 +1,7 @@ import polars as pl import polars.selectors as cs -from src.config import LOG_LEVEL, SIRENE_DATA_DIR +from src.config import LOG_LEVEL, SIRENE_DATA_DIR, ACHETEURS_NON_SIRENE from src.tasks.transform import ( extract_unique_acheteurs_siret, extract_unique_titulaires_siret, @@ -139,6 +139,9 @@ def enrich_from_sirene(lf: pl.LazyFrame): lf = lf.join(lf_sirets_acheteurs, how="left", on="acheteur_id") + # Fallback pour les acheteurs absents de SIRENE (ex: Ministère des Armées) + lf = apply_acheteurs_non_sirene_fallback(lf) + # En joignant en utilisant à la fois le SIRET et le typeIdentifiant, on s'assure qu'on ne joint pas sur # des id de titulaires non-SIRET lf = lf.join( @@ -158,6 +161,32 @@ def enrich_from_sirene(lf: pl.LazyFrame): return lf +def apply_acheteurs_non_sirene_fallback(lf: pl.LazyFrame) -> pl.LazyFrame: + """Applique les données d'acheteurs non présents dans SIRENE en fallback. + + Les acheteurs absents de SIRENE ET du fallback conservent acheteur_nom = NULL. + """ + if not ACHETEURS_NON_SIRENE: + return lf + + # Créer un DataFrame de fallback à partir du dictionnaire + fallback_data = [ + {"acheteur_id": siret, "acheteur_nom_fallback": data["nom"]} + for siret, data in ACHETEURS_NON_SIRENE.items() + ] + lf_fallback = pl.LazyFrame(fallback_data) + + # Joindre avec les données de fallback + lf = lf.join(lf_fallback, on="acheteur_id", how="left") + + # Remplacer acheteur_nom NULL par la valeur de fallback (si disponible) + lf = lf.with_columns( + pl.coalesce("acheteur_nom", "acheteur_nom_fallback").alias("acheteur_nom") + ).drop("acheteur_nom_fallback") + + return lf + + def calculate_distance(lf: pl.LazyFrame) -> pl.LazyFrame: # Utilisation de polars_ds.haversine # https://polars-ds-extension.readthedocs.io/en/latest/num.html#polars_ds.exprs.num.haversine diff --git a/src/tasks/publish.py b/src/tasks/publish.py index 08171f4..6529548 100644 --- a/src/tasks/publish.py +++ b/src/tasks/publish.py @@ -106,6 +106,8 @@ def publish_scrap_to_datagouv(year: str, month: str, file_path, target): dataset_ids = { "aws": "68caf6b135f19236a4f37a32", "marches-securises.fr": "68ebb48dd708fdb2d7c15bff", + "dume": "694ff7a98210456475f98aca", + "klekoon": "6952899077f982c9a2373ede", } logger = get_logger(level=LOG_LEVEL) diff --git a/src/tasks/scrap/__init__.py b/src/tasks/scrap/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/tasks/scrap.py b/src/tasks/scrap/aws.py similarity index 69% rename from src/tasks/scrap.py rename to src/tasks/scrap/aws.py index 4e3524a..187ac03 100644 --- a/src/tasks/scrap.py +++ b/src/tasks/scrap/aws.py @@ -7,7 +7,6 @@ from time import sleep import httpx -from bs4 import BeautifulSoup from prefect import task from selenium import webdriver from selenium.common import TimeoutException @@ -15,112 +14,12 @@ from selenium.webdriver.firefox.options import Options from selenium.webdriver.support.wait import WebDriverWait -from src.config import LOG_LEVEL -from src.tasks.publish import publish_scrap_to_datagouv -from src.tasks.utils import get_logger +from config import LOG_LEVEL +from tasks.publish import publish_scrap_to_datagouv +from tasks.utils import get_logger -def get_html(url: str, client: httpx.Client) -> str or None: - headers = { - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:128.0) Gecko/20100101 Firefox/128.0", - "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/png,image/svg+xml,*/*;q=0.8", - "Connection": "keep-alive", - } - logger = get_logger(level=LOG_LEVEL) - - def get_response() -> httpx.Response: - return client.get(url, timeout=timeout, headers=headers).raise_for_status() - - timeout = httpx.Timeout(20.0, connect=60.0, pool=20.0, read=20.0) - try: - response = get_response() - except (httpx.ConnectError, httpx.ReadTimeout, httpx.HTTPStatusError): - logger.debug("3s break and retrying...") - sleep(3) - try: - response = get_response() - except (httpx.ConnectError, httpx.ReadTimeout, httpx.HTTPStatusError): - logger.error("Skipped") - return None - html = response.text - return html - - -# @task( -# cache_policy=INPUTS, -# persist_result=True, -# cache_expiration=datetime.timedelta(days=15), -# ) -def get_json_marches_securises(url: str, client: httpx.Client) -> dict or None: - json_html_page = get_html(url, client) - logger = get_logger(level=LOG_LEVEL) - - sleep(0.1) - if json_html_page: - json_html_page = ( - json_html_page.replace("", "") + "" - ) - else: - logger.warning("json_html_page is None, skipping...") - return None - json_html_page_soup = BeautifulSoup(json_html_page, "html.parser") - try: - decp_json = json.loads(json_html_page_soup.find("body").string) - except Exception as e: - logger.info(json_html_page) - logger.info(e) - return None - return decp_json - - -@task(log_prints=True) -def scrap_marches_securises_month(year: str, month: str, dist_dir: Path): - logger = get_logger(level=LOG_LEVEL) - - marches = [] - page = 1 - with httpx.Client() as client: - while True: - search_url = ( - f"https://www.marches-securises.fr/entreprise/?module=liste_donnees_essentielles&page={str(page)}&siret_pa=&siret_pa1=&date_deb={year}-{month}-01&date_fin={year}-{month}-31&date_deb_ms={year}-{month}-01&date_fin_ms={year}-{month}-31&ref_ume=&cpv_et=&type_procedure=&type_marche=&objet=&rs_oe=&dep_liste=&ctrl_key=aWwwS1pLUlFzejBOYitCWEZzZTEzZz09&text=&donnees_essentielles=1&search=" - f"table_ms&" - ) - - def parse_result_page(): - html_result_page = get_html(search_url, client) - if html_result_page is None: - return [] - soup = BeautifulSoup(html_result_page, "html.parser") - result_div = soup.find("div", attrs={"id": "liste_consultations"}) - logger.info("Year: ", year, "Month: ", month, "Page: ", str(page)) - return result_div.find_all( - "a", attrs={"title": "Télécharger au format Json"} - ) - - try: - json_links = parse_result_page() - except AttributeError: - sleep(3) - logger.info("Retrying result page download and parsing...") - json_links = parse_result_page() - - if not json_links: - break - else: - page += 1 - for json_link in json_links: - json_href = "https://www.marches-securises.fr" + json_link["href"] - decp_json = get_json_marches_securises(json_href, client) - marches.append(decp_json) - if len(marches) > 0: - dicts = {"marches": marches} - json_path = dist_dir / f"marches-securises_{year}-{month}.json" - with open(json_path, "w") as f: - f.write(json.dumps(dicts)) - publish_scrap_to_datagouv(year, month, json_path, "marches-securises.fr") - - -@task(log_prints=True) +@task() def scrap_aws_month(year: str = None, month: str = None, dist_dir: Path = None): logger = get_logger(level=LOG_LEVEL) @@ -206,7 +105,7 @@ def search_form(end_date_: date) -> tuple[date, str, int]: continue elif result_code == "timeout": # On réessaie après 10 secondes - sleep(10) + sleep(3) retry_count += 1 continue elif result_code is None: diff --git a/src/tasks/scrap/dume.py b/src/tasks/scrap/dume.py new file mode 100644 index 0000000..57c3758 --- /dev/null +++ b/src/tasks/scrap/dume.py @@ -0,0 +1,187 @@ +import calendar +import json +from datetime import date, timedelta +from pathlib import Path +from time import sleep + +import dume_api +import polars as pl + +from config import DIST_DIR, LOG_LEVEL +from tasks.publish import publish_scrap_to_datagouv +from tasks.utils import get_logger + + +def scrap_dume_month(year: str = None, month: str = None, dist_dir: Path = None): + logger = get_logger(level=LOG_LEVEL) + + end_date = start_date = date(int(year), int(month), 1) + base_duration = timedelta(days=0) + nb_days_in_month = calendar.monthrange(start_date.year, start_date.month)[1] + last_month_day = start_date + timedelta(days=nb_days_in_month - 1) + marches_month = [] + decp_uids = set( + pl.read_parquet(DIST_DIR / "decp.parquet", columns=["uid"])["uid"].to_list() + ) + + retry_count = 0 + + while end_date < last_month_day: + # On évite de boucler sans fin + if retry_count > 3: + logger.error("Trop d'essais, start_date + 1") + retry_count = 0 + start_date = end_date + timedelta(days=1) + continue + + start_date_str = start_date.isoformat() + + if retry_count == 0: + end_date = start_date + base_duration + + if end_date > last_month_day: + end_date = last_month_day + + end_date_str = end_date.isoformat() + + logger.info(f"➡️ {start_date_str} -> {end_date_str}") + rows, nb_results = get_dume_rows(start_date_str, end_date_str) + + marches = [] + if nb_results > 0: + marches = dume_to_decp(rows) + + marches_month.extend(marches) + + # On passe aux jours suivants + start_date = end_date + timedelta(days=1) + retry_count = 0 + continue + + if len(marches_month) > 0: + # Format 2022, donc double niveau + dicts = {"marches": {"marche": marches_month}} + json_path = dist_dir / f"dume_{year}-{month}.json" + with open(json_path, "w") as f: + f.write(json.dumps(dicts, indent=2)) + logger.info(str(len(marches_month)) + f" marchés pour le mois ({month}/{year})") + + get_uid_stats(marches_month, decp_uids) + publish_scrap_to_datagouv(year, month, json_path, "dume") + + +def dume_to_decp(rows): + new_rows = [] + + def get_titulaires(titulaires): + if isinstance(titulaires, list) and len(titulaires) > 0: + return [{"titulaire": titulaire} for titulaire in d.get("titulaires")] + return [] + + for r in rows: + d = r.get("donneesMP") + new_row = { + "uid": d.get("idAcheteur") + r.get("id"), + "id": r.get("id"), + "objet": r.get("objet"), + "nature": r.get("nature"), + "procedure": r.get("procedure"), + "dureeMois": r.get("dureeMois"), + "datePublicationDonnees": r.get("datePublicationDonnees"), + "acheteur": { + "id": d.get("idAcheteur"), + }, + "techniques": [ + {"technique": [d.get("technique")]}, + ], + "modalitesExecution": [ + {"modaliteExecution": [d.get("modaliteExecution")]}, + ], + "idAccordCadre": d.get("idAccordCadre"), + "codeCPV": d.get("codeCPV"), + "lieuExecution": { + "code": d.get("lieuExecutionCode"), + "typeCode": d.get("lieuExecutionTypeCode"), + }, + "dateNotification": d.get("dateNotification"), + "marchesInnovant": d.get("marchesInnovant"), + "attributionAvance": d.get("attributionAvance"), + "tauxAvance": d.get("tauxAvance"), + "origineUE": d.get("origineUE"), + "origineFrance": d.get("origineFrance"), + "ccag": d.get("ccag"), + "offresRecues": d.get("offresRecues"), + "montant": d.get("montant"), + "formePrix": d.get("formePrix"), + "typesPrix": {"typePrix": [d.get("typePrix")]}, + "typeGroupementOperateurs": d.get("typeGroupementOperateurs"), + "sousTraitanceDeclaree": d.get("sousTraitanceDeclaree"), + "titulaires": get_titulaires(d.get("titulaires")), + "modifications": r.get("modifications"), + "source": "scrap_aife_dume", + } + new_rows.append(new_row) + + return new_rows + + +def get_dume_rows(start_date_str: str, end_date_str: str) -> tuple[list[dict], int]: + procedures = [ + "Marché passé sans publicité ni mise en concurrence préalable", + "Appel d'offres ouvert", + "Appel d'offres restreint", + "Procédure adaptée", + "Procédure avec négociation", + "Dialogue compétitif", + ] + + logger = get_logger(level=LOG_LEVEL) + + _rows = [] + + for procedure in procedures: + try: + new_rows = dume_api.get_contracts( + date_start=start_date_str, + date_end=end_date_str, + type_de="MP", + procedure=procedure, + partition_map={ + "nature": [ + "Marché", + "Marché de partenariat", + "Accord-cadre", + "Marché subséquent", + ] + }, + ) + sleep(0.1) + _rows.extend(new_rows) + logger.debug(procedure + " " + str(len(new_rows)) + f" rows ({len(_rows)})") + except RuntimeError: + logger.error(f"Trop de lignes, on passe ({procedure})") + + _nb_results = len(_rows) + logger.debug(str(_nb_results) + " rows pour la période") + + return _rows, _nb_results + + +def get_uid_stats(marches, decp_uids): + logger = get_logger(level=LOG_LEVEL) + + dume_uids = {marche["uid"] for marche in marches} + + in_decp_uids = dume_uids.intersection(decp_uids) + + logger.info( + str(len(dume_uids)) + " identifiants uniques dans le DUME pour cette période" + ) + logger.info( + str(len(in_decp_uids)) + + " identifiants uniques dans le DUME pour cette période présents dans les DECP consolidées" + ) + logger.info( + str(round(len(in_decp_uids) / len(dume_uids) * 100, 2)) + + " % des identifiants sur cette période sont présents dans les DECP consolidées tabulaires" + ) diff --git a/src/tasks/scrap/klekoon.py b/src/tasks/scrap/klekoon.py new file mode 100644 index 0000000..90d6a1b --- /dev/null +++ b/src/tasks/scrap/klekoon.py @@ -0,0 +1,217 @@ +import json +import os +import subprocess +from pathlib import Path +from time import sleep + +import httpx +import polars as pl +from httpx import Client +from lxml import etree + +from src.config import LOG_LEVEL +from src.tasks.publish import publish_scrap_to_datagouv +from src.tasks.utils import get_logger + +DCATS_URL = "https://www.klekoon.com/declaration-profil-acheteur" +CLIENT = Client( + timeout=20, headers={"User-Agent": "decp.info", "Connection": "keep-alive"} +) + + +def get_dcats(dist_dir) -> list: + logger = get_logger(level="DEBUG") + + df_dcats = get_dcats_df(dist_dir) + dcat_paths = [] + dcats_url = df_dcats["urlDCAT"].unique().to_list() + dcat_dir = dist_dir / "dcats" + + nb_dcats = len(dcats_url) + logger.debug(f"{nb_dcats} dcats") + + if dcat_dir.exists(): + dcat_paths = os.listdir(dcat_dir) + dcat_paths = [dcat_dir / path for path in dcat_paths] + return dcat_paths + os.makedirs(dcat_dir, exist_ok=True) + + for i, url in enumerate(dcats_url): + sleep(0.1) + logger.debug(f"{i}/{nb_dcats} {url} (download)") + siret = url.split("/")[-1] + path = Path(f"{dcat_dir}/{siret}.xml") + with open(path, "w") as f: + response = CLIENT.get(url).raise_for_status() + text = response.text + text: str = text.replace(' encoding="utf-16"', "", 1) + f.write(text) + dcat_paths.append(path) + + return dcat_paths + + +def scrap_klekoon(dist_dir: Path): + logger = get_logger(level=LOG_LEVEL) + + logger.info("Téléchargement des fichiers DCAT...") + dcat_paths = get_dcats(dist_dir) + + logger.info("Parsing des DCATs pour récupérer les URLs de marchés...") + urls = extract_urls_from_dcat(dcat_paths) + + logger.info("Téléchargement des marchés JSON...") + marches_per_month = get_marches_json(urls, dist_dir) + + logger.info("Concaténation des JSON par mois...") + json_months_paths = concat_json_per_month(marches_per_month, dist_dir) + + logger.info("Publication...") + for path in json_months_paths: + year_month = path.split("/")[-1].split(".")[0].split("_")[1] + year, month = year_month.split("-") + + publish_scrap_to_datagouv( + year=year, month=month, file_path=path, target="klekoon" + ) + + +def extract_urls_from_dcat(dcat_paths: list): + logger = get_logger(level=LOG_LEVEL) + len_dcat_paths = len(dcat_paths) + + marches_urls = [] + + namespaces = { + "rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#", + "dcat": "http://www.w3.org/ns/dcat#", + "dct": "http://purl.org/dc/terms/", + } + + for i, dcat_path in enumerate(dcat_paths): + logger.debug(f"{i}/{len_dcat_paths} {dcat_path} (parsing)") + + urls = [] + + try: + tree: etree._ElementTree = etree.parse(dcat_path) + datasets = tree.findall(".//dcat:dataset", namespaces=namespaces) + + for dataset in datasets: + ressources = dataset.findall( + ".//{http://www.w3.org/2000/01/rdf-schema#}ressource", + namespaces=namespaces, + ) + month_publication: etree._Element = dataset.find( + "dct:issued", namespaces=namespaces + ) + url = "" + + for ressource in ressources: + url: str = ressource.text.strip() + if url and "/json/" in url: + break + + try: + assert url != "" + except AssertionError: + continue + + if month_publication is not None: + month_publication: str = month_publication.text.strip()[:7] + else: + try: + month_publication: str = get_month_publication(url) + except httpx.HTTPStatusError: + logger.error(f"URL {url} 404") + continue + + urls.append({"url": url, "month": month_publication}) + + logger.debug(f"{i}/{len_dcat_paths} {len(urls)} marchés") + marches_urls.extend(urls) + + except (AssertionError, etree.XMLSyntaxError) as e: + logger.error(f"{dcat_path}: {e}") + + return marches_urls + + +def get_marches_json(urls, dist_dir): + logger = get_logger(level=LOG_LEVEL) + + marches_per_month = {} + nb_urls = len(urls) + urls = sorted(urls, key=lambda x: x["month"]) + + for i, url in enumerate(urls): + sleep(0.1) + logger.debug(f"{i}/{nb_urls} {url}") + month = url["month"] + marche_url = url["url"] + + os.makedirs(f"{dist_dir}/{month}", exist_ok=True) + + if month not in marches_per_month: + marches_per_month[month] = [] + + id_marche = marche_url.rsplit("/")[-1] + path_marche = f"{dist_dir}/{month}/{id_marche}.json" + with open(path_marche, "w") as f: + try: + response = CLIENT.get(marche_url).raise_for_status().json() + json.dump(response, f) + except httpx.HTTPStatusError as e: + logger.error(f"{marche_url}: {e}") + continue + + marches_per_month[month].append(path_marche) + + return marches_per_month + + +def concat_json_per_month(marches_per_month, dist_dir): + json_months_paths = [] + for month in marches_per_month: + marches_json_month = [] + + for marche in marches_per_month[month]: + with open(marche, "rb") as f: + marche_json = json.load(f) + del marche_json["$schema"] + marche_json["source"] = "scrap_klekoon" + marches_json_month.append(marche_json) + + marches_json_month = {"marches": marches_json_month} + path = f"{dist_dir}/klekoon_{month}.json" + with open(path, "w") as f: + json.dump(marches_json_month, f, indent=2) + json_months_paths.append(path) + + return json_months_paths + + +def get_dcats_df(dist_dir): + # Exécuter curl sans décoder automatiquement + path = f"{dist_dir}/dcat.csv" + if not (os.path.exists(path)): + result = subprocess.run( + ["curl", "-s", "-L", DCATS_URL], capture_output=True, check=True + ) + # Décoder manuellement en ISO-8859-1 (ou Windows-1252) + content = result.stdout.decode("iso-8859-1") + content = content.replace(";coordonnnees", ";coordonnnees\n") + + with open(path := f"{dist_dir}/dcat.csv", "w") as f: + f.write(content) + + # Lire avec Polars + df = pl.read_csv(path, separator=";", encoding="iso-8859-1") + return df + + +def get_month_publication(url) -> str: + marche_json = CLIENT.get(url).raise_for_status().json() + date_publication = marche_json["datePublicationDonnees"] + month_publication = date_publication[:7] + return month_publication diff --git a/src/tasks/scrap/marches_securises.py b/src/tasks/scrap/marches_securises.py new file mode 100644 index 0000000..fee6316 --- /dev/null +++ b/src/tasks/scrap/marches_securises.py @@ -0,0 +1,106 @@ +import json +from pathlib import Path +from time import sleep + +import httpx +from bs4 import BeautifulSoup +from prefect import task + +from config import LOG_LEVEL +from tasks.publish import publish_scrap_to_datagouv +from tasks.utils import get_logger + + +def get_html(url: str, client: httpx.Client) -> str or None: + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:128.0) Gecko/20100101 Firefox/128.0", + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/png,image/svg+xml,*/*;q=0.8", + "Connection": "keep-alive", + } + logger = get_logger(level=LOG_LEVEL) + + def get_response() -> httpx.Response: + return client.get(url, timeout=timeout, headers=headers).raise_for_status() + + timeout = httpx.Timeout(20.0, connect=60.0, pool=20.0, read=20.0) + try: + response = get_response() + except (httpx.ConnectError, httpx.ReadTimeout, httpx.HTTPStatusError): + logger.debug("3s break and retrying...") + sleep(3) + try: + response = get_response() + except (httpx.ConnectError, httpx.ReadTimeout, httpx.HTTPStatusError): + logger.error("Skipped") + return None + html = response.text + return html + + +def get_json_marches_securises(url: str, client: httpx.Client) -> dict or None: + json_html_page = get_html(url, client) + logger = get_logger(level=LOG_LEVEL) + + sleep(0.1) + if json_html_page: + json_html_page = ( + json_html_page.replace("", "") + "" + ) + else: + logger.warning("json_html_page is None, skipping...") + return None + json_html_page_soup = BeautifulSoup(json_html_page, "html.parser") + try: + decp_json = json.loads(json_html_page_soup.find("body").string) + except Exception as e: + logger.info(json_html_page) + logger.info(e) + return None + return decp_json + + +@task(log_prints=True) +def scrap_marches_securises_month(year: str, month: str, dist_dir: Path): + logger = get_logger(level=LOG_LEVEL) + + marches = [] + page = 1 + with httpx.Client() as client: + while True: + search_url = ( + f"https://www.marches-securises.fr/entreprise/?module=liste_donnees_essentielles&page={str(page)}&siret_pa=&siret_pa1=&date_deb={year}-{month}-01&date_fin={year}-{month}-31&date_deb_ms={year}-{month}-01&date_fin_ms={year}-{month}-31&ref_ume=&cpv_et=&type_procedure=&type_marche=&objet=&rs_oe=&dep_liste=&ctrl_key=aWwwS1pLUlFzejBOYitCWEZzZTEzZz09&text=&donnees_essentielles=1&search=" + f"table_ms&" + ) + + def parse_result_page(): + html_result_page = get_html(search_url, client) + if html_result_page is None: + return [] + soup = BeautifulSoup(html_result_page, "html.parser") + result_div = soup.find("div", attrs={"id": "liste_consultations"}) + logger.info(f"Year: {year}, Month: {month}, Page: {str(page)}") + return result_div.find_all( + "a", attrs={"title": "Télécharger au format Json"} + ) + + try: + json_links = parse_result_page() + except AttributeError: + sleep(1) + logger.info("Retrying result page download and parsing...") + json_links = parse_result_page() + + if not json_links: + break + else: + page += 1 + for json_link in json_links: + json_href = "https://www.marches-securises.fr" + json_link["href"] + decp_json = get_json_marches_securises(json_href, client) + marches.append(decp_json) + if len(marches) > 0: + dicts = {"marches": marches} + json_path = dist_dir / f"marches-securises_{year}-{month}.json" + with open(json_path, "w") as f: + f.write(json.dumps(dicts)) + publish_scrap_to_datagouv(year, month, json_path, "marches-securises.fr") diff --git a/src/tasks/utils.py b/src/tasks/utils.py index 6059602..8456ca0 100644 --- a/src/tasks/utils.py +++ b/src/tasks/utils.py @@ -12,6 +12,7 @@ from prefect.logging import get_run_logger from src.config import ( + ALL_CONFIG, CACHE_EXPIRATION_TIME_HOURS, DATE_NOW, DIST_DIR, @@ -80,7 +81,7 @@ def remove_unused_cache( logger.info(f"Suppression du fichier de cache: {file}") deleted_files.append(file) file.unlink() - logger.info(f"-> {len(deleted_files)} fichiers supprimés") + logger.info(f"-> {len(deleted_files)} fichiers de cache supprimés") # @@ -343,8 +344,9 @@ def check_parquet_file(path) -> bool: return False -def print_all_config(all_config): +def print_all_config(): logger = get_logger(level=LOG_LEVEL) + all_config = ALL_CONFIG msg = "" for k, v in sorted(all_config.items()): @@ -354,6 +356,8 @@ def print_all_config(all_config): def get_logger(level: str) -> logging.Logger: try: - return get_run_logger(level=level) + logger = get_run_logger() + logger.setLevel(level) + return logger except MissingContextError: return logging.Logger(name="Fallback logger", level=level) diff --git a/tests/test_clean.py b/tests/test_clean.py index bb485f2..16ecb20 100644 --- a/tests/test_clean.py +++ b/tests/test_clean.py @@ -213,6 +213,7 @@ def test_clean_decp(): "id": ["id.1", "id/2", ""], "acheteur_id": ["ach1", "ach2", ""], "acheteur.id": ["", "ach2", ""], + "objet": "Avec des 'apo'strophe's", "montant": ["1000", "1000000000000.00", "2000"], "datePublicationDonnees": ["2023-01-01", "0002-11-30", "2023-01-02"], "dateNotification": ["2023-01-01", "2023-01-01", "2023-01-01"], @@ -284,6 +285,7 @@ def test_clean_decp(): assert df_result["considerationsEnvironnementales"][0] == "Sans objet" assert df_result["ccag"][0] == "Sans objet" assert df_result["typeGroupement"][0] == "Sans objet" + assert df_result["objet"][1] == "Avec des 'apo’strophe’s" # Check nature replacement assert df_result["nature"][0] == "Marché subséquent"