diff --git a/src/policyengine_api/main.py b/src/policyengine_api/main.py index 735dd3e..35bbf16 100644 --- a/src/policyengine_api/main.py +++ b/src/policyengine_api/main.py @@ -35,6 +35,14 @@ def _scrubbing_callback(m: logfire.ScrubMatch): ) logfire.instrument_httpx() + # Disable noisy SQLAlchemy auto-instrumentation + try: + from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor + + SQLAlchemyInstrumentor().uninstrument() + except ImportError: + pass # Not installed + @asynccontextmanager async def lifespan(app: FastAPI): diff --git a/src/policyengine_api/modal_app.py b/src/policyengine_api/modal_app.py index cdc3d8d..4c979df 100644 --- a/src/policyengine_api/modal_app.py +++ b/src/policyengine_api/modal_app.py @@ -147,7 +147,11 @@ def download_dataset( @app.function( - image=uk_image, secrets=[db_secrets, logfire_secrets], memory=4096, cpu=4, timeout=600 + image=uk_image, + secrets=[db_secrets, logfire_secrets], + memory=4096, + cpu=4, + timeout=600, ) def simulate_household_uk( job_id: str, @@ -169,8 +173,6 @@ def simulate_household_uk( configure_logfire("policyengine-modal-uk", traceparent) with logfire.span("simulate_household_uk", job_id=job_id): - logfire.info("Starting UK household calculation", job_id=job_id) - database_url = get_database_url() engine = create_engine(database_url) @@ -249,8 +251,6 @@ def simulate_household_uk( ) session.commit() - logfire.info("UK household job completed", job_id=job_id) - except Exception as e: logfire.error("UK household job failed", job_id=job_id, error=str(e)) with Session(engine) as session: @@ -277,7 +277,11 @@ def simulate_household_uk( @app.function( - image=us_image, secrets=[db_secrets, logfire_secrets], memory=4096, cpu=4, timeout=600 + image=us_image, + secrets=[db_secrets, logfire_secrets], + memory=4096, + cpu=4, + timeout=600, ) def simulate_household_us( job_id: str, @@ -302,8 +306,6 @@ def simulate_household_us( configure_logfire("policyengine-modal-us", traceparent) with logfire.span("simulate_household_us", job_id=job_id): - logfire.info("Starting US household calculation", job_id=job_id) - database_url = get_database_url() engine = create_engine(database_url) @@ -388,8 +390,6 @@ def simulate_household_us( ) session.commit() - logfire.info("US household job completed", job_id=job_id) - except Exception as e: logfire.error("US household job failed", job_id=job_id, error=str(e)) with Session(engine) as session: @@ -416,7 +416,11 @@ def simulate_household_us( @app.function( - image=uk_image, secrets=[db_secrets, logfire_secrets], memory=8192, cpu=8, timeout=1800 + image=uk_image, + secrets=[db_secrets, logfire_secrets], + memory=8192, + cpu=8, + timeout=1800, ) def simulate_economy_uk(simulation_id: str, traceparent: str | None = None) -> None: """Run a single UK economy simulation and write results to database.""" @@ -429,112 +433,121 @@ def simulate_economy_uk(simulation_id: str, traceparent: str | None = None) -> N configure_logfire("policyengine-modal-uk", traceparent) - with logfire.span("simulate_economy_uk", simulation_id=simulation_id): - logfire.info("Starting UK economy simulation", simulation_id=simulation_id) - - database_url = get_database_url() - supabase_url = os.environ["SUPABASE_URL"] - supabase_key = os.environ["SUPABASE_KEY"] - storage_bucket = os.environ.get("STORAGE_BUCKET", "datasets") - - engine = create_engine(database_url) - - try: - from policyengine_api.models import ( - Dataset, - Simulation, - SimulationStatus, - ) - with Session(engine) as session: - simulation = session.get(Simulation, UUID(simulation_id)) - if not simulation: - raise ValueError(f"Simulation {simulation_id} not found") - - # Skip if already completed - if simulation.status == SimulationStatus.COMPLETED: - logfire.info("Simulation already completed", simulation_id=simulation_id) - return - - # Update status to running - simulation.status = SimulationStatus.RUNNING - session.add(simulation) - session.commit() + try: + with logfire.span("simulate_economy_uk", simulation_id=simulation_id): + database_url = get_database_url() + supabase_url = os.environ["SUPABASE_URL"] + supabase_key = os.environ["SUPABASE_KEY"] + storage_bucket = os.environ.get("STORAGE_BUCKET", "datasets") - # Get dataset - dataset = session.get(Dataset, simulation.dataset_id) - if not dataset: - raise ValueError(f"Dataset {simulation.dataset_id} not found") + engine = create_engine(database_url) - # Import policyengine - from policyengine.core import Simulation as PESimulation - from policyengine.tax_benefit_models.uk import uk_latest - from policyengine.tax_benefit_models.uk.datasets import ( - PolicyEngineUKDataset, + try: + from policyengine_api.models import ( + Dataset, + Simulation, + SimulationStatus, ) - pe_model_version = uk_latest - - # Get policy and dynamic - policy = _get_pe_policy_uk(simulation.policy_id, pe_model_version, session) - dynamic = _get_pe_dynamic_uk( - simulation.dynamic_id, pe_model_version, session - ) + with Session(engine) as session: + simulation = session.get(Simulation, UUID(simulation_id)) + if not simulation: + raise ValueError(f"Simulation {simulation_id} not found") + + # Skip if already completed + if simulation.status == SimulationStatus.COMPLETED: + logfire.info( + "Simulation already completed", simulation_id=simulation_id + ) + return - # Download dataset - logfire.info("Loading dataset", filepath=dataset.filepath) - local_path = download_dataset( - dataset.filepath, supabase_url, supabase_key, storage_bucket - ) + # Update status to running + simulation.status = SimulationStatus.RUNNING + session.add(simulation) + session.commit() - pe_dataset = PolicyEngineUKDataset( - name=dataset.name, - description=dataset.description or "", - filepath=local_path, - year=dataset.year, - ) + # Get dataset + dataset = session.get(Dataset, simulation.dataset_id) + if not dataset: + raise ValueError(f"Dataset {simulation.dataset_id} not found") - # Create and run simulation - with logfire.span("run_simulation"): - pe_sim = PESimulation( - dataset=pe_dataset, - tax_benefit_model_version=pe_model_version, - policy=policy, - dynamic=dynamic, + # Import policyengine + from policyengine.core import Simulation as PESimulation + from policyengine.tax_benefit_models.uk import uk_latest + from policyengine.tax_benefit_models.uk.datasets import ( + PolicyEngineUKDataset, ) - pe_sim.ensure() - # Mark as completed - simulation.status = SimulationStatus.COMPLETED - simulation.completed_at = datetime.now(timezone.utc) - session.add(simulation) - session.commit() + pe_model_version = uk_latest - logfire.info("UK economy simulation completed", simulation_id=simulation_id) + # Get policy and dynamic + policy = _get_pe_policy_uk( + simulation.policy_id, pe_model_version, session + ) + dynamic = _get_pe_dynamic_uk( + simulation.dynamic_id, pe_model_version, session + ) - except Exception as e: - logfire.error("UK economy simulation failed", simulation_id=simulation_id, error=str(e)) - # Use raw SQL to mark as failed - models may not be available - try: - from sqlmodel import text + # Download dataset + local_path = download_dataset( + dataset.filepath, supabase_url, supabase_key, storage_bucket + ) - with Session(engine) as session: - session.execute( - text( - "UPDATE simulations SET status = 'failed', error_message = :error " - "WHERE id = :sim_id" - ), - {"sim_id": simulation_id, "error": str(e)[:1000]}, + pe_dataset = PolicyEngineUKDataset( + name=dataset.name, + description=dataset.description or "", + filepath=local_path, + year=dataset.year, ) + + # Create and run simulation + with logfire.span("run_simulation"): + pe_sim = PESimulation( + dataset=pe_dataset, + tax_benefit_model_version=pe_model_version, + policy=policy, + dynamic=dynamic, + ) + pe_sim.ensure() + + # Mark as completed + simulation.status = SimulationStatus.COMPLETED + simulation.completed_at = datetime.now(timezone.utc) + session.add(simulation) session.commit() - except Exception as db_error: - logfire.error("Failed to update DB", error=str(db_error)) - raise - finally: - logfire.force_flush() + + except Exception as e: + logfire.error( + "UK economy simulation failed", + simulation_id=simulation_id, + error=str(e), + ) + # Use raw SQL to mark as failed - models may not be available + try: + from sqlmodel import text + + with Session(engine) as session: + session.execute( + text( + "UPDATE simulations SET status = 'failed', error_message = :error " + "WHERE id = :sim_id" + ), + {"sim_id": simulation_id, "error": str(e)[:1000]}, + ) + session.commit() + except Exception as db_error: + logfire.error("Failed to update DB", error=str(db_error)) + raise + finally: + logfire.force_flush() @app.function( - image=us_image, secrets=[db_secrets, logfire_secrets], memory=8192, cpu=8, timeout=1800 + image=us_image, + secrets=[db_secrets, logfire_secrets], + memory=8192, + cpu=8, + timeout=1800, ) def simulate_economy_us(simulation_id: str, traceparent: str | None = None) -> None: """Run a single US economy simulation and write results to database.""" @@ -547,112 +560,121 @@ def simulate_economy_us(simulation_id: str, traceparent: str | None = None) -> N configure_logfire("policyengine-modal-us", traceparent) - with logfire.span("simulate_economy_us", simulation_id=simulation_id): - logfire.info("Starting US economy simulation", simulation_id=simulation_id) - - database_url = get_database_url() - supabase_url = os.environ["SUPABASE_URL"] - supabase_key = os.environ["SUPABASE_KEY"] - storage_bucket = os.environ.get("STORAGE_BUCKET", "datasets") + try: + with logfire.span("simulate_economy_us", simulation_id=simulation_id): + database_url = get_database_url() + supabase_url = os.environ["SUPABASE_URL"] + supabase_key = os.environ["SUPABASE_KEY"] + storage_bucket = os.environ.get("STORAGE_BUCKET", "datasets") - engine = create_engine(database_url) + engine = create_engine(database_url) - try: - from policyengine_api.models import ( - Dataset, - Simulation, - SimulationStatus, - ) - with Session(engine) as session: - simulation = session.get(Simulation, UUID(simulation_id)) - if not simulation: - raise ValueError(f"Simulation {simulation_id} not found") - - # Skip if already completed - if simulation.status == SimulationStatus.COMPLETED: - logfire.info("Simulation already completed", simulation_id=simulation_id) - return - - # Update status to running - simulation.status = SimulationStatus.RUNNING - session.add(simulation) - session.commit() + try: + from policyengine_api.models import ( + Dataset, + Simulation, + SimulationStatus, + ) - # Get dataset - dataset = session.get(Dataset, simulation.dataset_id) - if not dataset: - raise ValueError(f"Dataset {simulation.dataset_id} not found") + with Session(engine) as session: + simulation = session.get(Simulation, UUID(simulation_id)) + if not simulation: + raise ValueError(f"Simulation {simulation_id} not found") + + # Skip if already completed + if simulation.status == SimulationStatus.COMPLETED: + logfire.info( + "Simulation already completed", simulation_id=simulation_id + ) + return - # Import policyengine - from policyengine.core import Simulation as PESimulation - from policyengine.tax_benefit_models.us import us_latest - from policyengine.tax_benefit_models.us.datasets import ( - PolicyEngineUSDataset, - ) + # Update status to running + simulation.status = SimulationStatus.RUNNING + session.add(simulation) + session.commit() - pe_model_version = us_latest + # Get dataset + dataset = session.get(Dataset, simulation.dataset_id) + if not dataset: + raise ValueError(f"Dataset {simulation.dataset_id} not found") - # Get policy and dynamic - policy = _get_pe_policy_us(simulation.policy_id, pe_model_version, session) - dynamic = _get_pe_dynamic_us( - simulation.dynamic_id, pe_model_version, session - ) + # Import policyengine + from policyengine.core import Simulation as PESimulation + from policyengine.tax_benefit_models.us import us_latest + from policyengine.tax_benefit_models.us.datasets import ( + PolicyEngineUSDataset, + ) - # Download dataset - logfire.info("Loading dataset", filepath=dataset.filepath) - local_path = download_dataset( - dataset.filepath, supabase_url, supabase_key, storage_bucket - ) + pe_model_version = us_latest - pe_dataset = PolicyEngineUSDataset( - name=dataset.name, - description=dataset.description or "", - filepath=local_path, - year=dataset.year, - ) + # Get policy and dynamic + policy = _get_pe_policy_us( + simulation.policy_id, pe_model_version, session + ) + dynamic = _get_pe_dynamic_us( + simulation.dynamic_id, pe_model_version, session + ) - # Create and run simulation - with logfire.span("run_simulation"): - pe_sim = PESimulation( - dataset=pe_dataset, - tax_benefit_model_version=pe_model_version, - policy=policy, - dynamic=dynamic, + # Download dataset + local_path = download_dataset( + dataset.filepath, supabase_url, supabase_key, storage_bucket ) - pe_sim.ensure() - # Mark as completed - simulation.status = SimulationStatus.COMPLETED - simulation.completed_at = datetime.now(timezone.utc) - session.add(simulation) - session.commit() + pe_dataset = PolicyEngineUSDataset( + name=dataset.name, + description=dataset.description or "", + filepath=local_path, + year=dataset.year, + ) - logfire.info("US economy simulation completed", simulation_id=simulation_id) + # Create and run simulation + with logfire.span("run_simulation"): + pe_sim = PESimulation( + dataset=pe_dataset, + tax_benefit_model_version=pe_model_version, + policy=policy, + dynamic=dynamic, + ) + pe_sim.ensure() - except Exception as e: - logfire.error("US economy simulation failed", simulation_id=simulation_id, error=str(e)) - # Use raw SQL to mark as failed - models may not be available - try: - from sqlmodel import text + # Mark as completed + simulation.status = SimulationStatus.COMPLETED + simulation.completed_at = datetime.now(timezone.utc) + session.add(simulation) + session.commit() - with Session(engine) as session: - session.execute( - text( - "UPDATE simulations SET status = 'failed', error_message = :error " - "WHERE id = :sim_id" - ), - {"sim_id": simulation_id, "error": str(e)[:1000]}, - ) + except Exception as e: + logfire.error( + "US economy simulation failed", + simulation_id=simulation_id, + error=str(e), + ) + # Use raw SQL to mark as failed - models may not be available + try: + from sqlmodel import text + + with Session(engine) as session: + session.execute( + text( + "UPDATE simulations SET status = 'failed', error_message = :error " + "WHERE id = :sim_id" + ), + {"sim_id": simulation_id, "error": str(e)[:1000]}, + ) session.commit() - except Exception as db_error: - logfire.error("Failed to update DB", error=str(db_error)) - raise - finally: - logfire.force_flush() + except Exception as db_error: + logfire.error("Failed to update DB", error=str(db_error)) + raise + finally: + logfire.force_flush() @app.function( - image=uk_image, secrets=[db_secrets, logfire_secrets], memory=8192, cpu=8, timeout=1800 + image=uk_image, + secrets=[db_secrets, logfire_secrets], + memory=8192, + cpu=8, + timeout=1800, ) def economy_comparison_uk(job_id: str, traceparent: str | None = None) -> None: """Run UK economy comparison analysis (decile impacts, budget impact, etc).""" @@ -663,234 +685,241 @@ def economy_comparison_uk(job_id: str, traceparent: str | None = None) -> None: # Configure logfire FIRST to capture all time including imports configure_logfire("policyengine-modal-uk", traceparent) - with logfire.span("economy_comparison_uk", job_id=job_id): - from datetime import datetime, timezone - from uuid import UUID + try: + with logfire.span("economy_comparison_uk", job_id=job_id): + from datetime import datetime, timezone + from uuid import UUID - from sqlmodel import Session, create_engine + from sqlmodel import Session, create_engine - logfire.info("Starting UK economy comparison", job_id=job_id) + database_url = get_database_url() + supabase_url = os.environ["SUPABASE_URL"] + supabase_key = os.environ["SUPABASE_KEY"] + storage_bucket = os.environ.get("STORAGE_BUCKET", "datasets") - database_url = get_database_url() - supabase_url = os.environ["SUPABASE_URL"] - supabase_key = os.environ["SUPABASE_KEY"] - storage_bucket = os.environ.get("STORAGE_BUCKET", "datasets") + engine = create_engine(database_url) - engine = create_engine(database_url) + try: + # Import models inline + from policyengine_api.models import ( + Dataset, + DecileImpact, + ProgramStatistics, + Report, + ReportStatus, + Simulation, + SimulationStatus, + TaxBenefitModelVersion, + ) - try: - # Import models inline - from policyengine_api.models import ( - Dataset, - DecileImpact, - ProgramStatistics, - Report, - ReportStatus, - Simulation, - SimulationStatus, - TaxBenefitModelVersion, - ) - with Session(engine) as session: - # Load report and related data - report = session.get(Report, UUID(job_id)) - if not report: - raise ValueError(f"Report {job_id} not found") + with Session(engine) as session: + # Load report and related data + report = session.get(Report, UUID(job_id)) + if not report: + raise ValueError(f"Report {job_id} not found") - baseline_sim = session.get(Simulation, report.baseline_simulation_id) - reform_sim = session.get(Simulation, report.reform_simulation_id) + baseline_sim = session.get( + Simulation, report.baseline_simulation_id + ) + reform_sim = session.get(Simulation, report.reform_simulation_id) - if not baseline_sim or not reform_sim: - raise ValueError("Simulations not found") + if not baseline_sim or not reform_sim: + raise ValueError("Simulations not found") - # Update status to running - report.status = ReportStatus.RUNNING - session.add(report) - session.commit() - - # Get dataset - dataset = session.get(Dataset, baseline_sim.dataset_id) - if not dataset: - raise ValueError(f"Dataset {baseline_sim.dataset_id} not found") + # Update status to running + report.status = ReportStatus.RUNNING + session.add(report) + session.commit() - # Get model version (unused but keeping for reference) - _ = session.get( - TaxBenefitModelVersion, baseline_sim.tax_benefit_model_version_id - ) + # Get dataset + dataset = session.get(Dataset, baseline_sim.dataset_id) + if not dataset: + raise ValueError(f"Dataset {baseline_sim.dataset_id} not found") - # Import policyengine - from policyengine.core import Simulation as PESimulation - from policyengine.outputs import DecileImpact as PEDecileImpact - from policyengine.tax_benefit_models.uk import uk_latest - from policyengine.tax_benefit_models.uk.datasets import ( - PolicyEngineUKDataset, - ) - from policyengine.tax_benefit_models.uk.outputs import ( - ProgrammeStatistics as PEProgrammeStats, - ) + # Get model version (unused but keeping for reference) + _ = session.get( + TaxBenefitModelVersion, + baseline_sim.tax_benefit_model_version_id, + ) - pe_model_version = uk_latest + # Import policyengine + from policyengine.core import Simulation as PESimulation + from policyengine.outputs import DecileImpact as PEDecileImpact + from policyengine.tax_benefit_models.uk import uk_latest + from policyengine.tax_benefit_models.uk.datasets import ( + PolicyEngineUKDataset, + ) + from policyengine.tax_benefit_models.uk.outputs import ( + ProgrammeStatistics as PEProgrammeStats, + ) - # Get policies - baseline_policy = _get_pe_policy_uk( - baseline_sim.policy_id, pe_model_version, session - ) - reform_policy = _get_pe_policy_uk( - reform_sim.policy_id, pe_model_version, session - ) - baseline_dynamic = _get_pe_dynamic_uk( - baseline_sim.dynamic_id, pe_model_version, session - ) - reform_dynamic = _get_pe_dynamic_uk( - reform_sim.dynamic_id, pe_model_version, session - ) + pe_model_version = uk_latest - # Download dataset - with logfire.span("download_dataset", filepath=dataset.filepath): - local_path = download_dataset( - dataset.filepath, supabase_url, supabase_key, storage_bucket + # Get policies + baseline_policy = _get_pe_policy_uk( + baseline_sim.policy_id, pe_model_version, session ) - - with logfire.span("load_dataset"): - pe_dataset = PolicyEngineUKDataset( - name=dataset.name, - description=dataset.description or "", - filepath=local_path, - year=dataset.year, + reform_policy = _get_pe_policy_uk( + reform_sim.policy_id, pe_model_version, session ) - - # Create and run simulations - with logfire.span("run_baseline_simulation"): - pe_baseline_sim = PESimulation( - dataset=pe_dataset, - tax_benefit_model_version=pe_model_version, - policy=baseline_policy, - dynamic=baseline_dynamic, + baseline_dynamic = _get_pe_dynamic_uk( + baseline_sim.dynamic_id, pe_model_version, session ) - pe_baseline_sim.ensure() - - with logfire.span("run_reform_simulation"): - pe_reform_sim = PESimulation( - dataset=pe_dataset, - tax_benefit_model_version=pe_model_version, - policy=reform_policy, - dynamic=reform_dynamic, + reform_dynamic = _get_pe_dynamic_uk( + reform_sim.dynamic_id, pe_model_version, session ) - pe_reform_sim.ensure() - - # Calculate decile impacts - with logfire.span("calculate_decile_impacts"): - for decile_num in range(1, 11): - di = PEDecileImpact( - baseline_simulation=pe_baseline_sim, - reform_simulation=pe_reform_sim, - decile=decile_num, + + # Download dataset + with logfire.span("download_dataset", filepath=dataset.filepath): + local_path = download_dataset( + dataset.filepath, supabase_url, supabase_key, storage_bucket ) - di.run() - - decile_impact = DecileImpact( - baseline_simulation_id=baseline_sim.id, - reform_simulation_id=reform_sim.id, - report_id=report.id, - income_variable=di.income_variable, - entity=di.entity, - decile=di.decile, - quantiles=di.quantiles, - baseline_mean=di.baseline_mean, - reform_mean=di.reform_mean, - absolute_change=di.absolute_change, - relative_change=di.relative_change, - count_better_off=di.count_better_off, - count_worse_off=di.count_worse_off, - count_no_change=di.count_no_change, + + with logfire.span("load_dataset"): + pe_dataset = PolicyEngineUKDataset( + name=dataset.name, + description=dataset.description or "", + filepath=local_path, + year=dataset.year, ) - session.add(decile_impact) - # Calculate program statistics - with logfire.span("calculate_program_statistics"): - PEProgrammeStats.model_rebuild( - _types_namespace={"Simulation": PESimulation} - ) + # Create and run simulations + with logfire.span("run_baseline_simulation"): + pe_baseline_sim = PESimulation( + dataset=pe_dataset, + tax_benefit_model_version=pe_model_version, + policy=baseline_policy, + dynamic=baseline_dynamic, + ) + pe_baseline_sim.ensure() + + with logfire.span("run_reform_simulation"): + pe_reform_sim = PESimulation( + dataset=pe_dataset, + tax_benefit_model_version=pe_model_version, + policy=reform_policy, + dynamic=reform_dynamic, + ) + pe_reform_sim.ensure() - programmes = { - "income_tax": {"entity": "person", "is_tax": True}, - "national_insurance": {"entity": "person", "is_tax": True}, - "vat": {"entity": "household", "is_tax": True}, - "council_tax": {"entity": "household", "is_tax": True}, - "universal_credit": {"entity": "person", "is_tax": False}, - "child_benefit": {"entity": "person", "is_tax": False}, - "pension_credit": {"entity": "person", "is_tax": False}, - "income_support": {"entity": "person", "is_tax": False}, - "working_tax_credit": {"entity": "person", "is_tax": False}, - "child_tax_credit": {"entity": "person", "is_tax": False}, - } - - for prog_name, prog_info in programmes.items(): - try: - ps = PEProgrammeStats( + # Calculate decile impacts + with logfire.span("calculate_decile_impacts"): + for decile_num in range(1, 11): + di = PEDecileImpact( baseline_simulation=pe_baseline_sim, reform_simulation=pe_reform_sim, - programme_name=prog_name, - entity=prog_info["entity"], - is_tax=prog_info["is_tax"], + decile=decile_num, ) - ps.run() - program_stat = ProgramStatistics( + di.run() + + decile_impact = DecileImpact( baseline_simulation_id=baseline_sim.id, reform_simulation_id=reform_sim.id, report_id=report.id, - program_name=prog_name, - entity=prog_info["entity"], - is_tax=prog_info["is_tax"], - baseline_total=ps.baseline_total, - reform_total=ps.reform_total, - change=ps.change, - baseline_count=ps.baseline_count, - reform_count=ps.reform_count, - winners=ps.winners, - losers=ps.losers, + income_variable=di.income_variable, + entity=di.entity, + decile=di.decile, + quantiles=di.quantiles, + baseline_mean=di.baseline_mean, + reform_mean=di.reform_mean, + absolute_change=di.absolute_change, + relative_change=di.relative_change, + count_better_off=di.count_better_off, + count_worse_off=di.count_worse_off, + count_no_change=di.count_no_change, ) - session.add(program_stat) - except KeyError as e: - logfire.warn(f"Skipping {prog_name}: variable not found", error=str(e)) - - # Mark simulations and report as completed - baseline_sim.status = SimulationStatus.COMPLETED - baseline_sim.completed_at = datetime.now(timezone.utc) - reform_sim.status = SimulationStatus.COMPLETED - reform_sim.completed_at = datetime.now(timezone.utc) - report.status = ReportStatus.COMPLETED - - session.add(baseline_sim) - session.add(reform_sim) - session.add(report) - session.commit() - - logfire.info("UK economy comparison completed", job_id=job_id) + session.add(decile_impact) - except Exception as e: - logfire.error("UK economy comparison failed", job_id=job_id, error=str(e)) - # Use raw SQL to mark as failed - models may not be available - try: - from sqlmodel import text + # Calculate program statistics + with logfire.span("calculate_program_statistics"): + PEProgrammeStats.model_rebuild( + _types_namespace={"Simulation": PESimulation} + ) - with Session(engine) as session: - session.execute( - text( - "UPDATE reports SET status = 'failed', error_message = :error " - "WHERE id = :job_id" - ), - {"job_id": job_id, "error": str(e)[:1000]}, - ) + programmes = { + "income_tax": {"entity": "person", "is_tax": True}, + "national_insurance": {"entity": "person", "is_tax": True}, + "vat": {"entity": "household", "is_tax": True}, + "council_tax": {"entity": "household", "is_tax": True}, + "universal_credit": {"entity": "person", "is_tax": False}, + "child_benefit": {"entity": "person", "is_tax": False}, + "pension_credit": {"entity": "person", "is_tax": False}, + "income_support": {"entity": "person", "is_tax": False}, + "working_tax_credit": {"entity": "person", "is_tax": False}, + "child_tax_credit": {"entity": "person", "is_tax": False}, + } + + for prog_name, prog_info in programmes.items(): + try: + ps = PEProgrammeStats( + baseline_simulation=pe_baseline_sim, + reform_simulation=pe_reform_sim, + programme_name=prog_name, + entity=prog_info["entity"], + is_tax=prog_info["is_tax"], + ) + ps.run() + program_stat = ProgramStatistics( + baseline_simulation_id=baseline_sim.id, + reform_simulation_id=reform_sim.id, + report_id=report.id, + program_name=prog_name, + entity=prog_info["entity"], + is_tax=prog_info["is_tax"], + baseline_total=ps.baseline_total, + reform_total=ps.reform_total, + change=ps.change, + baseline_count=ps.baseline_count, + reform_count=ps.reform_count, + winners=ps.winners, + losers=ps.losers, + ) + session.add(program_stat) + except KeyError: + pass # Variable not in model, skip silently + + # Mark simulations and report as completed + baseline_sim.status = SimulationStatus.COMPLETED + baseline_sim.completed_at = datetime.now(timezone.utc) + reform_sim.status = SimulationStatus.COMPLETED + reform_sim.completed_at = datetime.now(timezone.utc) + report.status = ReportStatus.COMPLETED + + session.add(baseline_sim) + session.add(reform_sim) + session.add(report) session.commit() - except Exception as db_error: - logfire.error("Failed to update DB", error=str(db_error)) - raise - finally: - logfire.force_flush() + + except Exception as e: + logfire.error( + "UK economy comparison failed", job_id=job_id, error=str(e) + ) + # Use raw SQL to mark as failed - models may not be available + try: + from sqlmodel import text + + with Session(engine) as session: + session.execute( + text( + "UPDATE reports SET status = 'failed', error_message = :error " + "WHERE id = :job_id" + ), + {"job_id": job_id, "error": str(e)[:1000]}, + ) + session.commit() + except Exception as db_error: + logfire.error("Failed to update DB", error=str(db_error)) + raise + finally: + logfire.force_flush() @app.function( - image=us_image, secrets=[db_secrets, logfire_secrets], memory=8192, cpu=8, timeout=1800 + image=us_image, + secrets=[db_secrets, logfire_secrets], + memory=8192, + cpu=8, + timeout=1800, ) def economy_comparison_us(job_id: str, traceparent: str | None = None) -> None: """Run US economy comparison analysis (decile impacts, budget impact, etc).""" @@ -903,213 +932,218 @@ def economy_comparison_us(job_id: str, traceparent: str | None = None) -> None: configure_logfire("policyengine-modal-us", traceparent) - with logfire.span("economy_comparison_us", job_id=job_id): - logfire.info("Starting US economy comparison", job_id=job_id) + try: + with logfire.span("economy_comparison_us", job_id=job_id): + database_url = get_database_url() + supabase_url = os.environ["SUPABASE_URL"] + supabase_key = os.environ["SUPABASE_KEY"] + storage_bucket = os.environ.get("STORAGE_BUCKET", "datasets") - database_url = get_database_url() - supabase_url = os.environ["SUPABASE_URL"] - supabase_key = os.environ["SUPABASE_KEY"] - storage_bucket = os.environ.get("STORAGE_BUCKET", "datasets") - - engine = create_engine(database_url) - - try: - # Import models inline - from policyengine_api.models import ( - Dataset, - DecileImpact, - ProgramStatistics, - Report, - ReportStatus, - Simulation, - SimulationStatus, - ) + engine = create_engine(database_url) - with Session(engine) as session: - # Load report and related data - report = session.get(Report, UUID(job_id)) - if not report: - raise ValueError(f"Report {job_id} not found") - - baseline_sim = session.get(Simulation, report.baseline_simulation_id) - reform_sim = session.get(Simulation, report.reform_simulation_id) + try: + # Import models inline + from policyengine_api.models import ( + Dataset, + DecileImpact, + ProgramStatistics, + Report, + ReportStatus, + Simulation, + SimulationStatus, + ) - if not baseline_sim or not reform_sim: - raise ValueError("Simulations not found") + with Session(engine) as session: + # Load report and related data + report = session.get(Report, UUID(job_id)) + if not report: + raise ValueError(f"Report {job_id} not found") - # Update status to running - report.status = ReportStatus.RUNNING - session.add(report) - session.commit() + baseline_sim = session.get( + Simulation, report.baseline_simulation_id + ) + reform_sim = session.get(Simulation, report.reform_simulation_id) - # Get dataset - dataset = session.get(Dataset, baseline_sim.dataset_id) - if not dataset: - raise ValueError(f"Dataset {baseline_sim.dataset_id} not found") - - # Import policyengine - from policyengine.core import Simulation as PESimulation - from policyengine.outputs import DecileImpact as PEDecileImpact - from policyengine.tax_benefit_models.us import us_latest - from policyengine.tax_benefit_models.us.datasets import ( - PolicyEngineUSDataset, - ) - from policyengine.tax_benefit_models.us.outputs import ( - ProgramStatistics as PEProgramStats, - ) + if not baseline_sim or not reform_sim: + raise ValueError("Simulations not found") - pe_model_version = us_latest + # Update status to running + report.status = ReportStatus.RUNNING + session.add(report) + session.commit() - # Get policies - baseline_policy = _get_pe_policy_us( - baseline_sim.policy_id, pe_model_version, session - ) - reform_policy = _get_pe_policy_us( - reform_sim.policy_id, pe_model_version, session - ) - baseline_dynamic = _get_pe_dynamic_us( - baseline_sim.dynamic_id, pe_model_version, session - ) - reform_dynamic = _get_pe_dynamic_us( - reform_sim.dynamic_id, pe_model_version, session - ) + # Get dataset + dataset = session.get(Dataset, baseline_sim.dataset_id) + if not dataset: + raise ValueError(f"Dataset {baseline_sim.dataset_id} not found") + + # Import policyengine + from policyengine.core import Simulation as PESimulation + from policyengine.outputs import DecileImpact as PEDecileImpact + from policyengine.tax_benefit_models.us import us_latest + from policyengine.tax_benefit_models.us.datasets import ( + PolicyEngineUSDataset, + ) + from policyengine.tax_benefit_models.us.outputs import ( + ProgramStatistics as PEProgramStats, + ) - # Download dataset - logfire.info("Loading dataset", filepath=dataset.filepath) - local_path = download_dataset( - dataset.filepath, supabase_url, supabase_key, storage_bucket - ) + pe_model_version = us_latest - pe_dataset = PolicyEngineUSDataset( - name=dataset.name, - description=dataset.description or "", - filepath=local_path, - year=dataset.year, - ) + # Get policies + baseline_policy = _get_pe_policy_us( + baseline_sim.policy_id, pe_model_version, session + ) + reform_policy = _get_pe_policy_us( + reform_sim.policy_id, pe_model_version, session + ) + baseline_dynamic = _get_pe_dynamic_us( + baseline_sim.dynamic_id, pe_model_version, session + ) + reform_dynamic = _get_pe_dynamic_us( + reform_sim.dynamic_id, pe_model_version, session + ) - # Create and run simulations - with logfire.span("run_baseline_simulation"): - pe_baseline_sim = PESimulation( - dataset=pe_dataset, - tax_benefit_model_version=pe_model_version, - policy=baseline_policy, - dynamic=baseline_dynamic, + # Download dataset + local_path = download_dataset( + dataset.filepath, supabase_url, supabase_key, storage_bucket ) - pe_baseline_sim.ensure() - - with logfire.span("run_reform_simulation"): - pe_reform_sim = PESimulation( - dataset=pe_dataset, - tax_benefit_model_version=pe_model_version, - policy=reform_policy, - dynamic=reform_dynamic, + + pe_dataset = PolicyEngineUSDataset( + name=dataset.name, + description=dataset.description or "", + filepath=local_path, + year=dataset.year, ) - pe_reform_sim.ensure() - - # Calculate decile impacts - with logfire.span("calculate_decile_impacts"): - for decile_num in range(1, 11): - di = PEDecileImpact( - baseline_simulation=pe_baseline_sim, - reform_simulation=pe_reform_sim, - decile=decile_num, + + # Create and run simulations + with logfire.span("run_baseline_simulation"): + pe_baseline_sim = PESimulation( + dataset=pe_dataset, + tax_benefit_model_version=pe_model_version, + policy=baseline_policy, + dynamic=baseline_dynamic, ) - di.run() - - decile_impact = DecileImpact( - baseline_simulation_id=baseline_sim.id, - reform_simulation_id=reform_sim.id, - report_id=report.id, - income_variable=di.income_variable, - entity=di.entity, - decile=di.decile, - quantiles=di.quantiles, - baseline_mean=di.baseline_mean, - reform_mean=di.reform_mean, - absolute_change=di.absolute_change, - relative_change=di.relative_change, - count_better_off=di.count_better_off, - count_worse_off=di.count_worse_off, - count_no_change=di.count_no_change, + pe_baseline_sim.ensure() + + with logfire.span("run_reform_simulation"): + pe_reform_sim = PESimulation( + dataset=pe_dataset, + tax_benefit_model_version=pe_model_version, + policy=reform_policy, + dynamic=reform_dynamic, ) - session.add(decile_impact) - - # Calculate program statistics - with logfire.span("calculate_program_statistics"): - PEProgramStats.model_rebuild(_types_namespace={"Simulation": PESimulation}) - - programs = { - "income_tax": {"entity": "tax_unit", "is_tax": True}, - "employee_payroll_tax": {"entity": "person", "is_tax": True}, - "snap": {"entity": "spm_unit", "is_tax": False}, - "tanf": {"entity": "spm_unit", "is_tax": False}, - "ssi": {"entity": "spm_unit", "is_tax": False}, - "social_security": {"entity": "person", "is_tax": False}, - } - - for prog_name, prog_info in programs.items(): - try: - ps = PEProgramStats( + pe_reform_sim.ensure() + + # Calculate decile impacts + with logfire.span("calculate_decile_impacts"): + for decile_num in range(1, 11): + di = PEDecileImpact( baseline_simulation=pe_baseline_sim, reform_simulation=pe_reform_sim, - program_name=prog_name, - entity=prog_info["entity"], - is_tax=prog_info["is_tax"], + decile=decile_num, ) - ps.run() - program_stat = ProgramStatistics( + di.run() + + decile_impact = DecileImpact( baseline_simulation_id=baseline_sim.id, reform_simulation_id=reform_sim.id, report_id=report.id, - program_name=prog_name, - entity=prog_info["entity"], - is_tax=prog_info["is_tax"], - baseline_total=ps.baseline_total, - reform_total=ps.reform_total, - change=ps.change, - baseline_count=ps.baseline_count, - reform_count=ps.reform_count, - winners=ps.winners, - losers=ps.losers, + income_variable=di.income_variable, + entity=di.entity, + decile=di.decile, + quantiles=di.quantiles, + baseline_mean=di.baseline_mean, + reform_mean=di.reform_mean, + absolute_change=di.absolute_change, + relative_change=di.relative_change, + count_better_off=di.count_better_off, + count_worse_off=di.count_worse_off, + count_no_change=di.count_no_change, ) - session.add(program_stat) - except KeyError as e: - logfire.warn(f"Skipping {prog_name}: variable not found", error=str(e)) - - # Mark simulations and report as completed - baseline_sim.status = SimulationStatus.COMPLETED - baseline_sim.completed_at = datetime.now(timezone.utc) - reform_sim.status = SimulationStatus.COMPLETED - reform_sim.completed_at = datetime.now(timezone.utc) - report.status = ReportStatus.COMPLETED - - session.add(baseline_sim) - session.add(reform_sim) - session.add(report) - session.commit() + session.add(decile_impact) - logfire.info("US economy comparison completed", job_id=job_id) - - except Exception as e: - logfire.error("US economy comparison failed", job_id=job_id, error=str(e)) - # Use raw SQL to mark as failed - models may not be available - try: - from sqlmodel import text + # Calculate program statistics + with logfire.span("calculate_program_statistics"): + PEProgramStats.model_rebuild( + _types_namespace={"Simulation": PESimulation} + ) - with Session(engine) as session: - session.execute( - text( - "UPDATE reports SET status = 'failed', error_message = :error " - "WHERE id = :job_id" - ), - {"job_id": job_id, "error": str(e)[:1000]}, - ) + programs = { + "income_tax": {"entity": "tax_unit", "is_tax": True}, + "employee_payroll_tax": { + "entity": "person", + "is_tax": True, + }, + "snap": {"entity": "spm_unit", "is_tax": False}, + "tanf": {"entity": "spm_unit", "is_tax": False}, + "ssi": {"entity": "spm_unit", "is_tax": False}, + "social_security": {"entity": "person", "is_tax": False}, + } + + for prog_name, prog_info in programs.items(): + try: + ps = PEProgramStats( + baseline_simulation=pe_baseline_sim, + reform_simulation=pe_reform_sim, + program_name=prog_name, + entity=prog_info["entity"], + is_tax=prog_info["is_tax"], + ) + ps.run() + program_stat = ProgramStatistics( + baseline_simulation_id=baseline_sim.id, + reform_simulation_id=reform_sim.id, + report_id=report.id, + program_name=prog_name, + entity=prog_info["entity"], + is_tax=prog_info["is_tax"], + baseline_total=ps.baseline_total, + reform_total=ps.reform_total, + change=ps.change, + baseline_count=ps.baseline_count, + reform_count=ps.reform_count, + winners=ps.winners, + losers=ps.losers, + ) + session.add(program_stat) + except KeyError: + pass # Variable not in model, skip silently + + # Mark simulations and report as completed + baseline_sim.status = SimulationStatus.COMPLETED + baseline_sim.completed_at = datetime.now(timezone.utc) + reform_sim.status = SimulationStatus.COMPLETED + reform_sim.completed_at = datetime.now(timezone.utc) + report.status = ReportStatus.COMPLETED + + session.add(baseline_sim) + session.add(reform_sim) + session.add(report) session.commit() - except Exception as db_error: - logfire.error("Failed to update DB", error=str(db_error)) - raise - finally: - logfire.force_flush() + + except Exception as e: + logfire.error( + "US economy comparison failed", job_id=job_id, error=str(e) + ) + # Use raw SQL to mark as failed - models may not be available + try: + from sqlmodel import text + + with Session(engine) as session: + session.execute( + text( + "UPDATE reports SET status = 'failed', error_message = :error " + "WHERE id = :job_id" + ), + {"job_id": job_id, "error": str(e)[:1000]}, + ) + session.commit() + except Exception as db_error: + logfire.error("Failed to update DB", error=str(db_error)) + raise + finally: + logfire.force_flush() def _get_pe_policy_uk(policy_id, model_version, session): diff --git a/src/policyengine_api/services/database.py b/src/policyengine_api/services/database.py index 1ae42ba..f0e6ea0 100644 --- a/src/policyengine_api/services/database.py +++ b/src/policyengine_api/services/database.py @@ -4,12 +4,6 @@ engine = create_engine(settings.database_url, echo=settings.debug) -# Only instrument with logfire if configured -if settings.logfire_token: - import logfire - - logfire.instrument_sqlalchemy(engine=engine) - def get_session(): """Get database session."""