diff --git a/.gitignore b/.gitignore index c36d3c7..8319912 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ logs .venv *.egg-info *.parquet +!tests/data/sirene/*.parquet !code_officiel_geographique.parquet *.gz *.zip diff --git a/pyproject.toml b/pyproject.toml index a578876..0e37f12 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,9 +41,10 @@ testpaths = [ ] env = [ "DATASETS_REFERENCE_FILEPATH=tests/data/source_datasets_test.json", + "SIRENE_DATA_DIR=tests/data/sirene", "PREFECT_API_URL=", "DECP_PROCESSING_PUBLISH=", - "PREFECT_TASKS_REFRESH_CACHE=true" + "PREFECT_TASKS_REFRESH_CACHE=true", ] addopts = "-p no:warnings" diff --git a/src/config.py b/src/config.py index 8d8eac2..a595a4f 100644 --- a/src/config.py +++ b/src/config.py @@ -97,7 +97,15 @@ def make_sirene_data_dir(sirene_data_parent_dir) -> Path: SIRENE_DATA_PARENT_DIR = make_path_from_env("SIRENE_DATA_PARENT_DIR", DATA_DIR) -SIRENE_DATA_DIR = make_sirene_data_dir(SIRENE_DATA_PARENT_DIR) + +# SIRENE_DATA_DIR ne doit être spécifié que pour les tests. Laisser vide dans .env et laisser make_sirene_data_dir +# le déterminer +SIRENE_DATA_DIR = os.getenv( + "SIRENE_DATA_DIR", make_sirene_data_dir(SIRENE_DATA_PARENT_DIR) +) +if isinstance(SIRENE_DATA_DIR, str): + SIRENE_DATA_DIR = Path(os.path.join(BASE_DIR, SIRENE_DATA_DIR)) + # SIRENE_DATA_DIR on ne le crée que si nécessaire, dans flows.py print(f"{'SIRENE_DATA_PARENT_DIR':<40}", SIRENE_DATA_PARENT_DIR) print(f"{'SIRENE_DATA_DIR':<40}", SIRENE_DATA_DIR) diff --git a/src/flows/decp_processing.py b/src/flows/decp_processing.py index 39862c4..f83e347 100644 --- a/src/flows/decp_processing.py +++ b/src/flows/decp_processing.py @@ -69,7 +69,6 @@ def decp_processing(enable_cache_removal: bool = False): # Preprocessing des données SIRENE si : # - le dossier n'existe pas encore (= les données n'ont pas déjà été preprocessed ce mois-ci) # - on est au moins le 5 du mois (pour être sûr que les données SIRENE ont été mises à jour sur data.gouv.fr) - print(SIRENE_DATA_DIR) if not SIRENE_DATA_DIR.exists(): sirene_preprocess() diff --git a/src/flows/sirene_preprocess.py b/src/flows/sirene_preprocess.py index fd97e97..9b12094 100644 --- a/src/flows/sirene_preprocess.py +++ b/src/flows/sirene_preprocess.py @@ -3,8 +3,8 @@ from src.config import SIRENE_DATA_DIR from src.flows.get_cog import get_cog -from src.tasks.get import get_etablissements -from src.tasks.transform import get_prepare_unites_legales, prepare_etablissements +from src.tasks.get import get_etablissements, get_unite_legales +from src.tasks.transform import prepare_etablissements from src.tasks.utils import create_sirene_data_dir @@ -26,7 +26,7 @@ def sirene_preprocess(): processed_ul_parquet_path = SIRENE_DATA_DIR / "unites_legales.parquet" if not processed_ul_parquet_path.exists(): print("Téléchargement et préparation des unités légales...") - get_prepare_unites_legales(processed_ul_parquet_path) + get_unite_legales(processed_ul_parquet_path) else: print(processed_ul_parquet_path, " existe, skipping.") @@ -35,7 +35,7 @@ def sirene_preprocess(): if not processed_etab_parquet_path.exists(): print("Téléchargement et préparation des établissements...") lf = get_etablissements() - prepare_etablissements(lf, processed_etab_parquet_path) + prepare_etablissements(lf).sink_parquet(processed_etab_parquet_path) else: print(processed_etab_parquet_path, " existe, skipping.") diff --git a/src/tasks/enrich.py b/src/tasks/enrich.py index dfd63c9..65c5990 100644 --- a/src/tasks/enrich.py +++ b/src/tasks/enrich.py @@ -21,6 +21,27 @@ def add_etablissement_data( lf_sirets = lf_sirets.join( lf_etablissements, how="inner", left_on=siret_column, right_on="siret" ) + + # On ne prend pas l'activité des acheteurs + if type_siret == "acheteur": + lf_sirets = lf_sirets.drop(cs.starts_with("activite_")) + + # Si il y a un etablissement_nom (Enseigne1Etablissement ou denominationUsuelleEtablissement), + # on l'ajoute au nom de l'organisme, entre parenthèses + lf_sirets = lf_sirets.with_columns( + pl.when(pl.col("etablissement_nom").is_not_null()) + .then( + pl.concat_str( + pl.col(f"{type_siret}_nom"), + pl.lit(" ("), + pl.col("etablissement_nom"), + pl.lit(")"), + ) + ) + .otherwise(pl.col(f"{type_siret}_nom")) + .alias(f"{type_siret}_nom") + ).drop("etablissement_nom") + lf_sirets = lf_sirets.rename( { "latitude": f"{type_siret}_latitude", @@ -66,11 +87,6 @@ def enrich_from_sirene(lf: pl.LazyFrame): print("Extraction des SIRET des acheteurs...") lf_sirets_acheteurs = extract_unique_acheteurs_siret(lf.clone()) - print("Ajout des données établissements (acheteurs)...") - lf_sirets_acheteurs = add_etablissement_data( - lf_sirets_acheteurs, lf_etablissements, "acheteur_id", "acheteur" - ) - print("Ajout des données unités légales (acheteurs)...") lf_sirets_acheteurs = add_unite_legale_data( lf_sirets_acheteurs, @@ -79,6 +95,11 @@ def enrich_from_sirene(lf: pl.LazyFrame): type_siret="acheteur", ) + print("Ajout des données établissements (acheteurs)...") + lf_sirets_acheteurs = add_etablissement_data( + lf_sirets_acheteurs, lf_etablissements, "acheteur_id", "acheteur" + ) + lf = lf.join(lf_sirets_acheteurs, how="left", on="acheteur_id") del lf_sirets_acheteurs @@ -91,11 +112,6 @@ def enrich_from_sirene(lf: pl.LazyFrame): print("Extraction des SIRET des titulaires...") lf_sirets_titulaires = extract_unique_titulaires_siret(lf.clone()) - print("Ajout des données établissements (titulaires)...") - lf_sirets_titulaires = add_etablissement_data( - lf_sirets_titulaires, lf_etablissements, "titulaire_id", "titulaire" - ) - print("Ajout des données unités légales (titulaires)...") lf_sirets_titulaires = add_unite_legale_data( lf_sirets_titulaires, @@ -104,6 +120,11 @@ def enrich_from_sirene(lf: pl.LazyFrame): type_siret="titulaire", ) + print("Ajout des données établissements (titulaires)...") + lf_sirets_titulaires = add_etablissement_data( + lf_sirets_titulaires, lf_etablissements, "titulaire_id", "titulaire" + ) + # En joignant en utilisant à la fois le SIRET et le typeIdentifiant, on s'assure qu'on ne joint pas sur # des id de titulaires non-SIRET lf = lf.join( diff --git a/src/tasks/get.py b/src/tasks/get.py index 954dd7e..8aa1812 100644 --- a/src/tasks/get.py +++ b/src/tasks/get.py @@ -15,6 +15,7 @@ from prefect import task from prefect.transactions import transaction +from config import SIRENE_UNITES_LEGALES_URL from src.config import ( CACHE_EXPIRATION_TIME_HOURS, DECP_FORMAT_2022, @@ -36,6 +37,7 @@ get_clean_cache_key, stream_replace_bytestring, ) +from tasks.transform import prepare_unites_legales @task(retries=3, retry_delay_seconds=3) @@ -313,10 +315,10 @@ def yield_modifications(row: dict, separator="_") -> Iterator[dict] or None: raw_mods = raw_mods["modification"] # Couvre le (non-)format dans lequel "modifications" ou "modification" mène # directement à un dict contenant les métadonnées liées à une modification. - if isinstance(raw_mods, dict): + elif isinstance(raw_mods, dict): raw_mods = [raw_mods] - - raw_mods = [] if raw_mods is None else raw_mods + elif isinstance(raw_mods, str) or raw_mods is None: + raw_mods = [] mods = [{}] + raw_mods for i, mod in enumerate(mods): @@ -368,6 +370,8 @@ def get_etablissements() -> pl.LazyFrame: "longitude": pl.Float64, "activitePrincipaleEtablissement": pl.String, "nomenclatureActivitePrincipaleEtablissement": pl.String, + "enseigne1Etablissement": pl.String, + "denominationUsuelleEtablissement": pl.String, } columns = list(schema.keys()) @@ -386,7 +390,7 @@ def get_etablissements() -> pl.LazyFrame: hrefs.append(base_url + href) # Fonction de traitement pour un fichier - def process_file(_href: str): + def get_process_file(_href: str): print(_href.split("/")[-1]) try: response = http_client.get( @@ -402,18 +406,12 @@ def process_file(_href: str): content = response.content lff = pl.scan_csv(content, schema_overrides=schema) lff = lff.select(columns) - lff = lff.with_columns( - [ - pl.col("codeCommuneEtablissement").str.pad_start(5, "0"), - pl.col("siret").str.pad_start(14, "0"), - ] - ) return lff # Traitement en parrallèle avec 8 threads lfs = [] with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor: - futures = [executor.submit(process_file, href) for href in hrefs] + futures = [executor.submit(get_process_file, href) for href in hrefs] for future in concurrent.futures.as_completed(futures): try: lf = future.result() @@ -458,3 +456,13 @@ def get_clean(resource, resources_artifact: list) -> pl.DataFrame or None: df = None return df + + +@task +def get_unite_legales(processed_parquet_path): + print("Téléchargement des données unité légales et sélection des colonnes...") + ( + pl.scan_parquet(SIRENE_UNITES_LEGALES_URL) + .pipe(prepare_unites_legales) + .sink_parquet(processed_parquet_path) + ) diff --git a/src/tasks/transform.py b/src/tasks/transform.py index 81f340f..c00c06c 100644 --- a/src/tasks/transform.py +++ b/src/tasks/transform.py @@ -1,11 +1,9 @@ from datetime import datetime -from pathlib import Path import polars as pl import polars.selectors as cs -from prefect import task -from src.config import DATA_DIR, DIST_DIR, SIRENE_UNITES_LEGALES_URL, DecpFormat +from src.config import DATA_DIR, DIST_DIR, DecpFormat from src.tasks.output import save_to_files @@ -182,21 +180,70 @@ def extract_unique_titulaires_siret(lf: pl.LazyFrame): return lf -@task -def get_prepare_unites_legales(processed_parquet_path): - print("Téléchargement des données unité légales et sélection des colonnes...") - ( - pl.scan_parquet(SIRENE_UNITES_LEGALES_URL) - .select(["siren", "denominationUniteLegale"]) - .filter(pl.col("siren").is_not_null()) - .filter(pl.col("denominationUniteLegale").is_not_null()) - .unique() - .sink_parquet(processed_parquet_path) +def prepare_unites_legales(lf: pl.LazyFrame) -> pl.LazyFrame: + return ( + lf.select( + [ + "siren", + "denominationUniteLegale", + "prenomUsuelUniteLegale", + "nomUniteLegale", # toujours rempli pour personnes physique + "nomUsageUniteLegale", # parfois rempli, a la priorité sur nomUniteLegale + "statutDiffusionUniteLegale", # P = non-diffusible + ] + ) + .filter( + pl.col("siren").is_not_null() + ) # utilisation du fichier Stock, normalement pas de siren null + .unique() # utilisation du fichier Stock, normalement pas de doublons + .with_columns( + pl.when(pl.col("nomUsageUniteLegale").is_not_null()) + .then(pl.col("nomUsageUniteLegale")) + .otherwise(pl.col("nomUniteLegale")) + .alias("nomUniteLegale") + ) + .with_columns( + pl.when(pl.col("nomUniteLegale").is_not_null()) + .then( + pl.concat_str( + pl.col("prenomUsuelUniteLegale"), + pl.col("nomUniteLegale"), + separator=" ", + ) + ) + .otherwise(pl.col("denominationUniteLegale")) + .alias("denominationUniteLegale") + ) + .with_columns( + pl.when(pl.col("statutDiffusionUniteLegale") == "P") + .then(pl.lit("[Données personnelles non-diffusibles]")) + .otherwise(pl.col("denominationUniteLegale")) + .alias("denominationUniteLegale") + ) + .drop( + [ + "prenomUsuelUniteLegale", + "statutDiffusionUniteLegale", + "nomUniteLegale", + "nomUsageUniteLegale", + ] + ) ) -def prepare_etablissements(lf: pl.LazyFrame, processed_parquet_path: Path) -> None: - lf = lf.rename( +def prepare_etablissements(lff: pl.LazyFrame) -> pl.LazyFrame: + lff = lff.with_columns( + [ + pl.col("codeCommuneEtablissement").str.pad_start(5, "0"), + pl.col("siret").str.pad_start(14, "0"), + # Si enseigne1Etablissement est null, on utilise denominationUsuelleEtablissement + pl.coalesce( + "enseigne1Etablissement", "denominationUsuelleEtablissement" + ).alias("etablissement_nom"), + ] + ) + lff = lff.drop("denominationUsuelleEtablissement", "enseigne1Etablissement") + lff = lff.rename( { "codeCommuneEtablissement": "commune_code", "activitePrincipaleEtablissement": "activite_code", @@ -204,11 +251,11 @@ def prepare_etablissements(lf: pl.LazyFrame, processed_parquet_path: Path) -> No } ) - # Ajout des noms de départements, noms régions, + # Ajout des noms de commune, départements, régions lf_cog = pl.scan_parquet(DATA_DIR / "code_officiel_geographique.parquet") - lf = lf.join(lf_cog, on="commune_code", how="left") + lff = lff.join(lf_cog, on="commune_code", how="left") - lf.sink_parquet(processed_parquet_path) + return lff def sort_columns(df: pl.DataFrame, config_columns): @@ -231,6 +278,7 @@ def calculate_naf_cpv_matching(df: pl.DataFrame): lf_naf_cpv.select( "uid", "codeCPV", + "titulaire_id", "activite_code", "activite_nomenclature", "donneesActuelles", @@ -385,7 +433,7 @@ def add_duree_restante(lff: pl.LazyFrame): # # decp_acheteurs_df["acheteur_id"] = decp_acheteurs_df.apply(construct_nom, axis=1) # -# # TODO: ne garder que les colonnes acheteur_id et acheteur_id +# # TODO: ne garder que les colonnes acheteur_id et acheteur_nom # # return decp_acheteurs_df # diff --git a/tests/data/decp_test_2019.json b/tests/data/decp_test_2019.json index bed2b10..a415e6a 100644 --- a/tests/data/decp_test_2019.json +++ b/tests/data/decp_test_2019.json @@ -14,7 +14,7 @@ { "typeIdentifiant": "SIRET", "denominationSociale": "AMC FOLLIOT", - "id": "65265021900023" + "id": "12345678900022" } ] ] @@ -28,7 +28,7 @@ { "typeIdentifiant": "SIRET", "denominationSociale": "AMC FOLLIOT", - "id": "65265021900023" + "id": "12345678900022" } ], "id": "2019_83935401", @@ -68,7 +68,7 @@ "titulaires": [ { "typeIdentifiant": "SIRET", - "id": "34027049500021", + "id": "12345678900023", "denominationSociale": "FFF" }, { diff --git a/tests/data/decp_test_2022.json b/tests/data/decp_test_2022.json index 7d31bcb..f5073c9 100644 --- a/tests/data/decp_test_2022.json +++ b/tests/data/decp_test_2022.json @@ -30,7 +30,7 @@ { "titulaire": { "typeIdentifiant": "SIRET", - "id": "34027049500021" + "id": "12345678900023" } }, { @@ -184,7 +184,7 @@ { "titulaire": { "typeIdentifiant": "SIRET", - "id": "58211867500054" + "id": "12345678900022" } }, { diff --git a/tests/data/sirene/etablissements.parquet b/tests/data/sirene/etablissements.parquet new file mode 100644 index 0000000..f9c8531 Binary files /dev/null and b/tests/data/sirene/etablissements.parquet differ diff --git a/tests/data/sirene/unites_legales.parquet b/tests/data/sirene/unites_legales.parquet new file mode 100644 index 0000000..a1ed29f Binary files /dev/null and b/tests/data/sirene/unites_legales.parquet differ diff --git a/tests/test_enrich.py b/tests/test_enrich.py index 6082a72..893f514 100644 --- a/tests/test_enrich.py +++ b/tests/test_enrich.py @@ -1,38 +1,32 @@ import polars as pl from polars.testing import assert_frame_equal +from src.config import SIRENE_DATA_DIR from src.tasks.enrich import add_etablissement_data class TestEnrich: def test_add_etablissement_data(self): - lf_sirets = pl.LazyFrame({"org_id": ["12345678900022"]}) - - lf_etablissement = pl.LazyFrame( - { - "siret": ["12345678900022"], - "latitude": [11.12], - "longitude": [12.13], - "commune_code": ["12345"], - "departement_code": ["12"], - "region_code": ["01"], - "commune_nom": ["Commune"], - "departement_nom": ["Département"], - "region_nom": ["Région"], - } + lf_sirets = pl.LazyFrame( + {"org_id": ["12345678900022", "12345678900023"], "org_nom": ["Org", "Org"]} ) + lf_etablissement = pl.scan_parquet(SIRENE_DATA_DIR / "etablissements.parquet") + lf_output = pl.LazyFrame( { - "org_id": ["12345678900022"], - "org_latitude": [11.12], - "org_longitude": [12.13], - "org_commune_code": ["12345"], - "org_departement_code": ["12"], - "org_region_code": ["01"], - "org_commune_nom": ["Commune"], - "org_departement_nom": ["Département"], - "org_region_nom": ["Région"], + "org_id": ["12345678900022", "12345678900023"], + "org_latitude": [11.12, 11.12], + "org_longitude": [12.13, 12.13], + "org_commune_code": ["12345", "12345"], + "org_departement_code": ["12", "12"], + "org_region_code": ["01", "01"], + "org_commune_nom": ["Commune", "Commune"], + "org_departement_nom": ["Département", "Département"], + "org_region_nom": ["Région", "Région"], + "org_nom": ["Org (Établissement nom)", "Org"], + "activite_code": ["11.11A", "11.11B"], + "activite_nomenclature": ["NAFRev2", "NAFRev2"], } ) @@ -41,4 +35,5 @@ def test_add_etablissement_data(self): lf_sirets, lf_etablissement, "org_id", "org" ).collect(), lf_output.collect(), + check_column_order=False, ) diff --git a/tests/test_main.py b/tests/test_main.py index 2d63165..991b0a1 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -4,5 +4,5 @@ class TestFlow: def test_decp_processing(self): - with prefect_test_harness(server_startup_timeout=10): + with prefect_test_harness(server_startup_timeout=30): decp_processing() diff --git a/tests/test_transform.py b/tests/test_transform.py index 202fe19..e0fffe6 100644 --- a/tests/test_transform.py +++ b/tests/test_transform.py @@ -2,10 +2,122 @@ from polars.testing import assert_frame_equal from tasks.transform import ( + prepare_etablissements, + prepare_unites_legales, replace_with_modification_data, ) +class TestPrepareUnitesLegales: + def test_prepare_unites_legales(self): + lf = pl.LazyFrame( + [ + # Cas 1: Personne morale + { + "siren": "111111111", + "denominationUniteLegale": "Org 1", + "prenomUsuelUniteLegale": None, + "nomUniteLegale": None, + "nomUsageUniteLegale": None, + "statutDiffusionUniteLegale": "O", + }, + # Cas 2: Personne physique avec nom d'usage + { + "siren": "222222222", + "denominationUniteLegale": None, + "prenomUsuelUniteLegale": "Ambroise", + "nomUniteLegale": "Croizat", + "nomUsageUniteLegale": "Zacroit", # a la priorité + "statutDiffusionUniteLegale": "O", + }, + # Cas 3: Personne physique sans nom d'usage + { + "siren": "333333333", + "denominationUniteLegale": None, + "prenomUsuelUniteLegale": "Ambroise", + "nomUniteLegale": "Croizat", + "nomUsageUniteLegale": None, + "statutDiffusionUniteLegale": "O", + }, + # Cas 4: Nom non-diffusible + { + "siren": "44444444", + "denominationUniteLegale": None, + "prenomUsuelUniteLegale": "Ambroise", + "nomUniteLegale": "Croizat", + "nomUsageUniteLegale": None, + "statutDiffusionUniteLegale": "P", + }, + ] + ) + + # Expected DataFrame + expected_df = pl.DataFrame( + [ + # Cas 1: denominationUniteLegale est préservé + {"siren": "111111111", "denominationUniteLegale": "Org 1"}, + # Cas 2: denominationUniteLegale = prenom + nomUsage (Zacroit) + {"siren": "222222222", "denominationUniteLegale": "Ambroise Zacroit"}, + # Cas 3: denominationUniteLegale = prenom + nom (Croizat) + {"siren": "333333333", "denominationUniteLegale": "Ambroise Croizat"}, + # Cas 4: denominationUniteLegale = non-diffusible + { + "siren": "44444444", + "denominationUniteLegale": "[Données personnelles non-diffusibles]", + }, + ] + ) + + # Application de la fonction + result_df = prepare_unites_legales(lf).collect() + + # Tri des df + result_df = result_df.sort("siren") + expected_df = expected_df.sort("siren") + + assert_frame_equal(result_df, expected_df) + + +class TestPrepareEtablissements: + def test_prepare_etablissements(self): + lf = pl.LazyFrame( + [ + { + "siret": "11111111111", + "codeCommuneEtablissement": "1053", + "enseigne1Etablissement": None, + "denominationUsuelleEtablissement": "Dénom usuelle", + "activitePrincipaleEtablissement": "11.1A", + "nomenclatureActivitePrincipaleEtablissement": "NAFv2", + } + ] + ) + + expected_df = pl.DataFrame( + [ + { + "siret": "00011111111111", + "commune_code": "01053", + "etablissement_nom": "Dénom usuelle", + "activite_code": "11.1A", + "activite_nomenclature": "NAFv2", + "commune_nom": "Bourg-en-Bresse", + "departement_code": "01", + "region_code": "84", + "region_nom": "Auvergne-Rhône-Alpes", + "departement_nom": "Ain", + } + ] + ) + + assert_frame_equal( + prepare_etablissements(lf).collect(), + expected_df, + check_column_order=False, + check_dtypes=True, + ) + + class TestHandleModificationsMarche: def test_replace_with_modification_data(self): # Input LazyFrame - 3 test cases covering key scenarios