diff --git a/src/policyengine_api/modal_app.py b/src/policyengine_api/modal_app.py index 4c979df..c366066 100644 --- a/src/policyengine_api/modal_app.py +++ b/src/policyengine_api/modal_app.py @@ -172,108 +172,109 @@ def simulate_household_uk( configure_logfire("policyengine-modal-uk", traceparent) - with logfire.span("simulate_household_uk", job_id=job_id): - database_url = get_database_url() - engine = create_engine(database_url) - - try: - from policyengine.tax_benefit_models.uk import uk_latest - from policyengine.tax_benefit_models.uk.analysis import ( - UKHouseholdInput, - calculate_household_impact, - ) - - # Build policy if provided - policy = None - if policy_data: - from policyengine.core.policy import ( - ParameterValue as PEParameterValue, - ) - from policyengine.core.policy import ( - Policy as PEPolicy, - ) + try: + with logfire.span("simulate_household_uk", job_id=job_id): + database_url = get_database_url() + engine = create_engine(database_url) - pe_param_values = [] - param_lookup = {p.name: p for p in uk_latest.parameters} - for pv in policy_data.get("parameter_values", []): - pe_param = param_lookup.get(pv["parameter_name"]) - if pe_param: - pe_pv = PEParameterValue( - parameter=pe_param, - value=pv["value"], - start_date=datetime.fromisoformat(pv["start_date"]) - if pv.get("start_date") - else None, - end_date=datetime.fromisoformat(pv["end_date"]) - if pv.get("end_date") - else None, - ) - pe_param_values.append(pe_pv) - policy = PEPolicy( - name=policy_data.get("name", ""), - description=policy_data.get("description", ""), - parameter_values=pe_param_values, + try: + from policyengine.tax_benefit_models.uk import uk_latest + from policyengine.tax_benefit_models.uk.analysis import ( + UKHouseholdInput, + calculate_household_impact, ) - pe_input = UKHouseholdInput( - people=people, - benunit=benunit, - household=household, - year=year, - ) - - with logfire.span("calculate_household_impact"): - result = calculate_household_impact(pe_input, policy=policy) - - # Write result to database - with Session(engine) as session: - from sqlmodel import text - - session.exec( - text(""" - UPDATE household_jobs - SET status = 'COMPLETED', - result = :result, - completed_at = :completed_at - WHERE id = :job_id - """), - params={ - "job_id": job_id, - "result": json.dumps( - { - "person": result.person, - "benunit": result.benunit, - "household": result.household, - } - ), - "completed_at": datetime.now(timezone.utc), - }, - ) - session.commit() - - except Exception as e: - logfire.error("UK household job failed", job_id=job_id, error=str(e)) - with Session(engine) as session: - from sqlmodel import text - - session.exec( - text(""" - UPDATE household_jobs - SET status = 'FAILED', - error_message = :error, - completed_at = :completed_at - WHERE id = :job_id - """), - params={ - "job_id": job_id, - "error": str(e), - "completed_at": datetime.now(timezone.utc), - }, + # Build policy if provided + policy = None + if policy_data: + from policyengine.core.policy import ( + ParameterValue as PEParameterValue, + ) + from policyengine.core.policy import ( + Policy as PEPolicy, + ) + + pe_param_values = [] + param_lookup = {p.name: p for p in uk_latest.parameters} + for pv in policy_data.get("parameter_values", []): + pe_param = param_lookup.get(pv["parameter_name"]) + if pe_param: + pe_pv = PEParameterValue( + parameter=pe_param, + value=pv["value"], + start_date=datetime.fromisoformat(pv["start_date"]) + if pv.get("start_date") + else None, + end_date=datetime.fromisoformat(pv["end_date"]) + if pv.get("end_date") + else None, + ) + pe_param_values.append(pe_pv) + policy = PEPolicy( + name=policy_data.get("name", ""), + description=policy_data.get("description", ""), + parameter_values=pe_param_values, + ) + + pe_input = UKHouseholdInput( + people=people, + benunit=benunit, + household=household, + year=year, ) - session.commit() - raise - finally: - logfire.force_flush() + + with logfire.span("calculate_household_impact"): + result = calculate_household_impact(pe_input, policy=policy) + + # Write result to database + with Session(engine) as session: + from sqlmodel import text + + session.exec( + text(""" + UPDATE household_jobs + SET status = 'COMPLETED', + result = :result, + completed_at = :completed_at + WHERE id = :job_id + """), + params={ + "job_id": job_id, + "result": json.dumps( + { + "person": result.person, + "benunit": result.benunit, + "household": result.household, + } + ), + "completed_at": datetime.now(timezone.utc), + }, + ) + session.commit() + + except Exception as e: + logfire.error("UK household job failed", job_id=job_id, error=str(e)) + with Session(engine) as session: + from sqlmodel import text + + session.exec( + text(""" + UPDATE household_jobs + SET status = 'FAILED', + error_message = :error, + completed_at = :completed_at + WHERE id = :job_id + """), + params={ + "job_id": job_id, + "error": str(e), + "completed_at": datetime.now(timezone.utc), + }, + ) + session.commit() + raise + finally: + logfire.force_flush() @app.function( @@ -305,114 +306,115 @@ def simulate_household_us( configure_logfire("policyengine-modal-us", traceparent) - with logfire.span("simulate_household_us", job_id=job_id): - database_url = get_database_url() - engine = create_engine(database_url) - - try: - from policyengine.tax_benefit_models.us import us_latest - from policyengine.tax_benefit_models.us.analysis import ( - USHouseholdInput, - calculate_household_impact, - ) - - # Build policy if provided - policy = None - if policy_data: - from policyengine.core.policy import ( - ParameterValue as PEParameterValue, - ) - from policyengine.core.policy import ( - Policy as PEPolicy, - ) + try: + with logfire.span("simulate_household_us", job_id=job_id): + database_url = get_database_url() + engine = create_engine(database_url) - pe_param_values = [] - param_lookup = {p.name: p for p in us_latest.parameters} - for pv in policy_data.get("parameter_values", []): - pe_param = param_lookup.get(pv["parameter_name"]) - if pe_param: - pe_pv = PEParameterValue( - parameter=pe_param, - value=pv["value"], - start_date=datetime.fromisoformat(pv["start_date"]) - if pv.get("start_date") - else None, - end_date=datetime.fromisoformat(pv["end_date"]) - if pv.get("end_date") - else None, - ) - pe_param_values.append(pe_pv) - policy = PEPolicy( - name=policy_data.get("name", ""), - description=policy_data.get("description", ""), - parameter_values=pe_param_values, + try: + from policyengine.tax_benefit_models.us import us_latest + from policyengine.tax_benefit_models.us.analysis import ( + USHouseholdInput, + calculate_household_impact, ) - pe_input = USHouseholdInput( - people=people, - marital_unit=marital_unit, - family=family, - spm_unit=spm_unit, - tax_unit=tax_unit, - household=household, - year=year, - ) - - with logfire.span("calculate_household_impact"): - result = calculate_household_impact(pe_input, policy=policy) - - # Write result to database - with Session(engine) as session: - from sqlmodel import text - - session.exec( - text(""" - UPDATE household_jobs - SET status = 'COMPLETED', - result = :result, - completed_at = :completed_at - WHERE id = :job_id - """), - params={ - "job_id": job_id, - "result": json.dumps( - { - "person": result.person, - "marital_unit": result.marital_unit, - "family": result.family, - "spm_unit": result.spm_unit, - "tax_unit": result.tax_unit, - "household": result.household, - } - ), - "completed_at": datetime.now(timezone.utc), - }, - ) - session.commit() - - except Exception as e: - logfire.error("US household job failed", job_id=job_id, error=str(e)) - with Session(engine) as session: - from sqlmodel import text - - session.exec( - text(""" - UPDATE household_jobs - SET status = 'FAILED', - error_message = :error, - completed_at = :completed_at - WHERE id = :job_id - """), - params={ - "job_id": job_id, - "error": str(e), - "completed_at": datetime.now(timezone.utc), - }, + # Build policy if provided + policy = None + if policy_data: + from policyengine.core.policy import ( + ParameterValue as PEParameterValue, + ) + from policyengine.core.policy import ( + Policy as PEPolicy, + ) + + pe_param_values = [] + param_lookup = {p.name: p for p in us_latest.parameters} + for pv in policy_data.get("parameter_values", []): + pe_param = param_lookup.get(pv["parameter_name"]) + if pe_param: + pe_pv = PEParameterValue( + parameter=pe_param, + value=pv["value"], + start_date=datetime.fromisoformat(pv["start_date"]) + if pv.get("start_date") + else None, + end_date=datetime.fromisoformat(pv["end_date"]) + if pv.get("end_date") + else None, + ) + pe_param_values.append(pe_pv) + policy = PEPolicy( + name=policy_data.get("name", ""), + description=policy_data.get("description", ""), + parameter_values=pe_param_values, + ) + + pe_input = USHouseholdInput( + people=people, + marital_unit=marital_unit, + family=family, + spm_unit=spm_unit, + tax_unit=tax_unit, + household=household, + year=year, ) - session.commit() - raise - finally: - logfire.force_flush() + + with logfire.span("calculate_household_impact"): + result = calculate_household_impact(pe_input, policy=policy) + + # Write result to database + with Session(engine) as session: + from sqlmodel import text + + session.exec( + text(""" + UPDATE household_jobs + SET status = 'COMPLETED', + result = :result, + completed_at = :completed_at + WHERE id = :job_id + """), + params={ + "job_id": job_id, + "result": json.dumps( + { + "person": result.person, + "marital_unit": result.marital_unit, + "family": result.family, + "spm_unit": result.spm_unit, + "tax_unit": result.tax_unit, + "household": result.household, + } + ), + "completed_at": datetime.now(timezone.utc), + }, + ) + session.commit() + + except Exception as e: + logfire.error("US household job failed", job_id=job_id, error=str(e)) + with Session(engine) as session: + from sqlmodel import text + + session.exec( + text(""" + UPDATE household_jobs + SET status = 'FAILED', + error_message = :error, + completed_at = :completed_at + WHERE id = :job_id + """), + params={ + "job_id": job_id, + "error": str(e), + "completed_at": datetime.now(timezone.utc), + }, + ) + session.commit() + raise + finally: + logfire.force_flush() @app.function(