Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ logs
.venv
*.egg-info
*.parquet
!tests/data/sirene/*.parquet
!code_officiel_geographique.parquet
*.gz
*.zip
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ testpaths = [
]
env = [
"DATASETS_REFERENCE_FILEPATH=tests/data/source_datasets_test.json",
"SIRENE_DATA_DIR=tests/data/sirene",
"PREFECT_API_URL=",
"DECP_PROCESSING_PUBLISH=",
"PREFECT_TASKS_REFRESH_CACHE=true"
"PREFECT_TASKS_REFRESH_CACHE=true",
]
addopts = "-p no:warnings"

Expand Down
10 changes: 9 additions & 1 deletion src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,15 @@ def make_sirene_data_dir(sirene_data_parent_dir) -> Path:


SIRENE_DATA_PARENT_DIR = make_path_from_env("SIRENE_DATA_PARENT_DIR", DATA_DIR)
SIRENE_DATA_DIR = make_sirene_data_dir(SIRENE_DATA_PARENT_DIR)

# SIRENE_DATA_DIR ne doit être spécifié que pour les tests. Laisser vide dans .env et laisser make_sirene_data_dir
# le déterminer
SIRENE_DATA_DIR = os.getenv(
"SIRENE_DATA_DIR", make_sirene_data_dir(SIRENE_DATA_PARENT_DIR)
)
if isinstance(SIRENE_DATA_DIR, str):
SIRENE_DATA_DIR = Path(os.path.join(BASE_DIR, SIRENE_DATA_DIR))

# SIRENE_DATA_DIR on ne le crée que si nécessaire, dans flows.py
print(f"{'SIRENE_DATA_PARENT_DIR':<40}", SIRENE_DATA_PARENT_DIR)
print(f"{'SIRENE_DATA_DIR':<40}", SIRENE_DATA_DIR)
Expand Down
1 change: 0 additions & 1 deletion src/flows/decp_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ def decp_processing(enable_cache_removal: bool = False):
# Preprocessing des données SIRENE si :
# - le dossier n'existe pas encore (= les données n'ont pas déjà été preprocessed ce mois-ci)
# - on est au moins le 5 du mois (pour être sûr que les données SIRENE ont été mises à jour sur data.gouv.fr)
print(SIRENE_DATA_DIR)
if not SIRENE_DATA_DIR.exists():
sirene_preprocess()

Expand Down
8 changes: 4 additions & 4 deletions src/flows/sirene_preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

from src.config import SIRENE_DATA_DIR
from src.flows.get_cog import get_cog
from src.tasks.get import get_etablissements
from src.tasks.transform import get_prepare_unites_legales, prepare_etablissements
from src.tasks.get import get_etablissements, get_unite_legales
from src.tasks.transform import prepare_etablissements
from src.tasks.utils import create_sirene_data_dir


Expand All @@ -26,7 +26,7 @@ def sirene_preprocess():
processed_ul_parquet_path = SIRENE_DATA_DIR / "unites_legales.parquet"
if not processed_ul_parquet_path.exists():
print("Téléchargement et préparation des unités légales...")
get_prepare_unites_legales(processed_ul_parquet_path)
get_unite_legales(processed_ul_parquet_path)
else:
print(processed_ul_parquet_path, " existe, skipping.")

Expand All @@ -35,7 +35,7 @@ def sirene_preprocess():
if not processed_etab_parquet_path.exists():
print("Téléchargement et préparation des établissements...")
lf = get_etablissements()
prepare_etablissements(lf, processed_etab_parquet_path)
prepare_etablissements(lf).sink_parquet(processed_etab_parquet_path)
else:
print(processed_etab_parquet_path, " existe, skipping.")

Expand Down
41 changes: 31 additions & 10 deletions src/tasks/enrich.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,27 @@ def add_etablissement_data(
lf_sirets = lf_sirets.join(
lf_etablissements, how="inner", left_on=siret_column, right_on="siret"
)

# On ne prend pas l'activité des acheteurs
if type_siret == "acheteur":
lf_sirets = lf_sirets.drop(cs.starts_with("activite_"))

# Si il y a un etablissement_nom (Enseigne1Etablissement ou denominationUsuelleEtablissement),
# on l'ajoute au nom de l'organisme, entre parenthèses
lf_sirets = lf_sirets.with_columns(
pl.when(pl.col("etablissement_nom").is_not_null())
.then(
pl.concat_str(
pl.col(f"{type_siret}_nom"),
pl.lit(" ("),
pl.col("etablissement_nom"),
pl.lit(")"),
)
)
.otherwise(pl.col(f"{type_siret}_nom"))
.alias(f"{type_siret}_nom")
).drop("etablissement_nom")

lf_sirets = lf_sirets.rename(
{
"latitude": f"{type_siret}_latitude",
Expand Down Expand Up @@ -66,11 +87,6 @@ def enrich_from_sirene(lf: pl.LazyFrame):
print("Extraction des SIRET des acheteurs...")
lf_sirets_acheteurs = extract_unique_acheteurs_siret(lf.clone())

print("Ajout des données établissements (acheteurs)...")
lf_sirets_acheteurs = add_etablissement_data(
lf_sirets_acheteurs, lf_etablissements, "acheteur_id", "acheteur"
)

print("Ajout des données unités légales (acheteurs)...")
lf_sirets_acheteurs = add_unite_legale_data(
lf_sirets_acheteurs,
Expand All @@ -79,6 +95,11 @@ def enrich_from_sirene(lf: pl.LazyFrame):
type_siret="acheteur",
)

print("Ajout des données établissements (acheteurs)...")
lf_sirets_acheteurs = add_etablissement_data(
lf_sirets_acheteurs, lf_etablissements, "acheteur_id", "acheteur"
)

lf = lf.join(lf_sirets_acheteurs, how="left", on="acheteur_id")

del lf_sirets_acheteurs
Expand All @@ -91,11 +112,6 @@ def enrich_from_sirene(lf: pl.LazyFrame):
print("Extraction des SIRET des titulaires...")
lf_sirets_titulaires = extract_unique_titulaires_siret(lf.clone())

print("Ajout des données établissements (titulaires)...")
lf_sirets_titulaires = add_etablissement_data(
lf_sirets_titulaires, lf_etablissements, "titulaire_id", "titulaire"
)

print("Ajout des données unités légales (titulaires)...")
lf_sirets_titulaires = add_unite_legale_data(
lf_sirets_titulaires,
Expand All @@ -104,6 +120,11 @@ def enrich_from_sirene(lf: pl.LazyFrame):
type_siret="titulaire",
)

print("Ajout des données établissements (titulaires)...")
lf_sirets_titulaires = add_etablissement_data(
lf_sirets_titulaires, lf_etablissements, "titulaire_id", "titulaire"
)

# En joignant en utilisant à la fois le SIRET et le typeIdentifiant, on s'assure qu'on ne joint pas sur
# des id de titulaires non-SIRET
lf = lf.join(
Expand Down
30 changes: 19 additions & 11 deletions src/tasks/get.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from prefect import task
from prefect.transactions import transaction

from config import SIRENE_UNITES_LEGALES_URL
from src.config import (
CACHE_EXPIRATION_TIME_HOURS,
DECP_FORMAT_2022,
Expand All @@ -36,6 +37,7 @@
get_clean_cache_key,
stream_replace_bytestring,
)
from tasks.transform import prepare_unites_legales


@task(retries=3, retry_delay_seconds=3)
Expand Down Expand Up @@ -313,10 +315,10 @@ def yield_modifications(row: dict, separator="_") -> Iterator[dict] or None:
raw_mods = raw_mods["modification"]
# Couvre le (non-)format dans lequel "modifications" ou "modification" mène
# directement à un dict contenant les métadonnées liées à une modification.
if isinstance(raw_mods, dict):
elif isinstance(raw_mods, dict):
raw_mods = [raw_mods]

raw_mods = [] if raw_mods is None else raw_mods
elif isinstance(raw_mods, str) or raw_mods is None:
raw_mods = []

mods = [{}] + raw_mods
for i, mod in enumerate(mods):
Expand Down Expand Up @@ -368,6 +370,8 @@ def get_etablissements() -> pl.LazyFrame:
"longitude": pl.Float64,
"activitePrincipaleEtablissement": pl.String,
"nomenclatureActivitePrincipaleEtablissement": pl.String,
"enseigne1Etablissement": pl.String,
"denominationUsuelleEtablissement": pl.String,
}

columns = list(schema.keys())
Expand All @@ -386,7 +390,7 @@ def get_etablissements() -> pl.LazyFrame:
hrefs.append(base_url + href)

# Fonction de traitement pour un fichier
def process_file(_href: str):
def get_process_file(_href: str):
print(_href.split("/")[-1])
try:
response = http_client.get(
Expand All @@ -402,18 +406,12 @@ def process_file(_href: str):
content = response.content
lff = pl.scan_csv(content, schema_overrides=schema)
lff = lff.select(columns)
lff = lff.with_columns(
[
pl.col("codeCommuneEtablissement").str.pad_start(5, "0"),
pl.col("siret").str.pad_start(14, "0"),
]
)
return lff

# Traitement en parrallèle avec 8 threads
lfs = []
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
futures = [executor.submit(process_file, href) for href in hrefs]
futures = [executor.submit(get_process_file, href) for href in hrefs]
for future in concurrent.futures.as_completed(futures):
try:
lf = future.result()
Expand Down Expand Up @@ -458,3 +456,13 @@ def get_clean(resource, resources_artifact: list) -> pl.DataFrame or None:
df = None

return df


@task
def get_unite_legales(processed_parquet_path):
print("Téléchargement des données unité légales et sélection des colonnes...")
(
pl.scan_parquet(SIRENE_UNITES_LEGALES_URL)
.pipe(prepare_unites_legales)
.sink_parquet(processed_parquet_path)
)
86 changes: 67 additions & 19 deletions src/tasks/transform.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
from datetime import datetime
from pathlib import Path

import polars as pl
import polars.selectors as cs
from prefect import task

from src.config import DATA_DIR, DIST_DIR, SIRENE_UNITES_LEGALES_URL, DecpFormat
from src.config import DATA_DIR, DIST_DIR, DecpFormat
from src.tasks.output import save_to_files


Expand Down Expand Up @@ -182,33 +180,82 @@ def extract_unique_titulaires_siret(lf: pl.LazyFrame):
return lf


@task
def get_prepare_unites_legales(processed_parquet_path):
print("Téléchargement des données unité légales et sélection des colonnes...")
(
pl.scan_parquet(SIRENE_UNITES_LEGALES_URL)
.select(["siren", "denominationUniteLegale"])
.filter(pl.col("siren").is_not_null())
.filter(pl.col("denominationUniteLegale").is_not_null())
.unique()
.sink_parquet(processed_parquet_path)
def prepare_unites_legales(lf: pl.LazyFrame) -> pl.LazyFrame:
return (
lf.select(
[
"siren",
"denominationUniteLegale",
"prenomUsuelUniteLegale",
"nomUniteLegale", # toujours rempli pour personnes physique
"nomUsageUniteLegale", # parfois rempli, a la priorité sur nomUniteLegale
"statutDiffusionUniteLegale", # P = non-diffusible
]
)
.filter(
pl.col("siren").is_not_null()
) # utilisation du fichier Stock, normalement pas de siren null
.unique() # utilisation du fichier Stock, normalement pas de doublons
.with_columns(
pl.when(pl.col("nomUsageUniteLegale").is_not_null())
.then(pl.col("nomUsageUniteLegale"))
.otherwise(pl.col("nomUniteLegale"))
.alias("nomUniteLegale")
)
.with_columns(
pl.when(pl.col("nomUniteLegale").is_not_null())
.then(
pl.concat_str(
pl.col("prenomUsuelUniteLegale"),
pl.col("nomUniteLegale"),
separator=" ",
)
)
.otherwise(pl.col("denominationUniteLegale"))
.alias("denominationUniteLegale")
)
.with_columns(
pl.when(pl.col("statutDiffusionUniteLegale") == "P")
.then(pl.lit("[Données personnelles non-diffusibles]"))
.otherwise(pl.col("denominationUniteLegale"))
.alias("denominationUniteLegale")
)
.drop(
[
"prenomUsuelUniteLegale",
"statutDiffusionUniteLegale",
"nomUniteLegale",
"nomUsageUniteLegale",
]
)
)


def prepare_etablissements(lf: pl.LazyFrame, processed_parquet_path: Path) -> None:
lf = lf.rename(
def prepare_etablissements(lff: pl.LazyFrame) -> pl.LazyFrame:
lff = lff.with_columns(
[
pl.col("codeCommuneEtablissement").str.pad_start(5, "0"),
pl.col("siret").str.pad_start(14, "0"),
# Si enseigne1Etablissement est null, on utilise denominationUsuelleEtablissement
pl.coalesce(
"enseigne1Etablissement", "denominationUsuelleEtablissement"
).alias("etablissement_nom"),
]
)
lff = lff.drop("denominationUsuelleEtablissement", "enseigne1Etablissement")
lff = lff.rename(
{
"codeCommuneEtablissement": "commune_code",
"activitePrincipaleEtablissement": "activite_code",
"nomenclatureActivitePrincipaleEtablissement": "activite_nomenclature",
}
)

# Ajout des noms de départements, noms régions,
# Ajout des noms de commune, départements, régions
lf_cog = pl.scan_parquet(DATA_DIR / "code_officiel_geographique.parquet")
lf = lf.join(lf_cog, on="commune_code", how="left")
lff = lff.join(lf_cog, on="commune_code", how="left")

lf.sink_parquet(processed_parquet_path)
return lff


def sort_columns(df: pl.DataFrame, config_columns):
Expand All @@ -231,6 +278,7 @@ def calculate_naf_cpv_matching(df: pl.DataFrame):
lf_naf_cpv.select(
"uid",
"codeCPV",
"titulaire_id",
"activite_code",
"activite_nomenclature",
"donneesActuelles",
Expand Down Expand Up @@ -385,7 +433,7 @@ def add_duree_restante(lff: pl.LazyFrame):
#
# decp_acheteurs_df["acheteur_id"] = decp_acheteurs_df.apply(construct_nom, axis=1)
#
# # TODO: ne garder que les colonnes acheteur_id et acheteur_id
# # TODO: ne garder que les colonnes acheteur_id et acheteur_nom
#
# return decp_acheteurs_df
#
Expand Down
6 changes: 3 additions & 3 deletions tests/data/decp_test_2019.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
{
"typeIdentifiant": "SIRET",
"denominationSociale": "AMC FOLLIOT",
"id": "65265021900023"
"id": "12345678900022"
}
]
]
Expand All @@ -28,7 +28,7 @@
{
"typeIdentifiant": "SIRET",
"denominationSociale": "AMC FOLLIOT",
"id": "65265021900023"
"id": "12345678900022"
}
],
"id": "2019_83935401",
Expand Down Expand Up @@ -68,7 +68,7 @@
"titulaires": [
{
"typeIdentifiant": "SIRET",
"id": "34027049500021",
"id": "12345678900023",
"denominationSociale": "FFF"
},
{
Expand Down
Loading