diff --git a/.github/workflows/reusable_test.yaml b/.github/workflows/reusable_test.yaml index 99cfff16..c439be4e 100644 --- a/.github/workflows/reusable_test.yaml +++ b/.github/workflows/reusable_test.yaml @@ -75,6 +75,18 @@ jobs: TEST_LITE: ${{ !inputs.upload_data }} PYTHON_LOG_LEVEL: INFO + - name: Build datasets for local area calibration + if: inputs.full_suite + run: | + LOCAL_AREA_CALIBRATION=true python policyengine_us_data/datasets/cps/cps.py + LOCAL_AREA_CALIBRATION=true python policyengine_us_data/datasets/puf/puf.py + LOCAL_AREA_CALIBRATION=true python policyengine_us_data/datasets/cps/extended_cps.py + python policyengine_us_data/datasets/cps/local_area_calibration/create_stratified_cps.py 10500 + + - name: Run local area calibration tests + if: inputs.full_suite + run: pytest policyengine_us_data/tests/test_local_area_calibration/ -v + - name: Save calibration log if: inputs.full_suite uses: actions/upload-artifact@v4 diff --git a/Makefile b/Makefile index 78d0904d..fde07f6b 100644 --- a/Makefile +++ b/Makefile @@ -74,6 +74,12 @@ data: mv policyengine_us_data/storage/enhanced_cps_2024.h5 policyengine_us_data/storage/dense_enhanced_cps_2024.h5 cp policyengine_us_data/storage/sparse_enhanced_cps_2024.h5 policyengine_us_data/storage/enhanced_cps_2024.h5 +data-local-area: data + LOCAL_AREA_CALIBRATION=true python policyengine_us_data/datasets/cps/cps.py + LOCAL_AREA_CALIBRATION=true python policyengine_us_data/datasets/puf/puf.py + LOCAL_AREA_CALIBRATION=true python policyengine_us_data/datasets/cps/extended_cps.py + python policyengine_us_data/datasets/cps/local_area_calibration/create_stratified_cps.py 10500 + clean: rm -f policyengine_us_data/storage/*.h5 rm -f policyengine_us_data/storage/*.db diff --git a/changelog_entry.yaml b/changelog_entry.yaml index e69de29b..8b0c9961 100644 --- a/changelog_entry.yaml +++ b/changelog_entry.yaml @@ -0,0 +1,8 @@ +- bump: minor + changes: + added: + - Sparse matrix builder for local area calibration with database-driven constraints + - Local area calibration data pipeline (make data-local-area) + - ExtendedCPS_2023 and PUF_2023 dataset classes + - Stratified CPS sampling to preserve high-income households + - Matrix verification tests for local area calibration diff --git a/docs/local_area_calibration_setup.ipynb b/docs/local_area_calibration_setup.ipynb new file mode 100644 index 00000000..e8eae899 --- /dev/null +++ b/docs/local_area_calibration_setup.ipynb @@ -0,0 +1,369 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "cell-0", + "metadata": {}, + "source": [ + "# Local Area Calibration Setup\n", + "\n", + "This notebook demonstrates the sparse matrix construction for local area (congressional district) calibration. It uses a subset of CDs from NC, HI, MT, and AK for manageable runtime." + ] + }, + { + "cell_type": "markdown", + "id": "cell-1", + "metadata": {}, + "source": [ + "## Section 1: Setup & Configuration" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cell-2", + "metadata": {}, + "outputs": [], + "source": [ + "from sqlalchemy import create_engine, text\n", + "import pandas as pd\n", + "import numpy as np\n", + "\n", + "from policyengine_us import Microsimulation\n", + "from policyengine_us_data.storage import STORAGE_FOLDER\n", + "from policyengine_us_data.datasets.cps.local_area_calibration.sparse_matrix_builder import (\n", + " SparseMatrixBuilder,\n", + ")\n", + "from policyengine_us_data.datasets.cps.local_area_calibration.matrix_tracer import (\n", + " MatrixTracer,\n", + ")\n", + "from policyengine_us_data.datasets.cps.local_area_calibration.calibration_utils import (\n", + " get_calculated_variables,\n", + " create_target_groups,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cell-3", + "metadata": {}, + "outputs": [], + "source": [ + "db_path = STORAGE_FOLDER / \"policy_data.db\"\n", + "db_uri = f\"sqlite:///{db_path}\"\n", + "dataset_path = str(STORAGE_FOLDER / \"stratified_extended_cps_2023.h5\")\n", + "\n", + "engine = create_engine(db_uri)" + ] + }, + { + "cell_type": "markdown", + "id": "cell-4", + "metadata": {}, + "source": [ + "## Section 2: Select Test Congressional Districts\n", + "\n", + "We use CDs from 4 states for testing:\n", + "- **NC (37)**: 14 CDs (3701-3714) - provides same-state different-CD test cases\n", + "- **HI (15)**: 2 CDs (1501-1502)\n", + "- **MT (30)**: 2 CDs (3001-3002)\n", + "- **AK (2)**: 1 CD (200)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cell-5", + "metadata": {}, + "outputs": [], + "source": [ + "query = \"\"\"\n", + "SELECT DISTINCT sc.value as cd_geoid\n", + "FROM strata s\n", + "JOIN stratum_constraints sc ON s.stratum_id = sc.stratum_id\n", + "WHERE s.stratum_group_id = 1\n", + " AND sc.constraint_variable = 'congressional_district_geoid'\n", + " AND (\n", + " sc.value LIKE '37__'\n", + " OR sc.value LIKE '150_'\n", + " OR sc.value LIKE '300_'\n", + " OR sc.value = '200' OR sc.value = '201'\n", + " )\n", + "ORDER BY sc.value\n", + "\"\"\"\n", + "\n", + "with engine.connect() as conn:\n", + " result = conn.execute(text(query)).fetchall()\n", + " test_cds = [row[0] for row in result]\n", + "\n", + "print(f\"Testing with {len(test_cds)} congressional districts:\")\n", + "print(f\" NC (37): {[cd for cd in test_cds if cd.startswith('37')]}\")\n", + "print(f\" HI (15): {[cd for cd in test_cds if cd.startswith('15')]}\")\n", + "print(f\" MT (30): {[cd for cd in test_cds if cd.startswith('30')]}\")\n", + "print(f\" AK (2): {[cd for cd in test_cds if cd.startswith('20')]}\")" + ] + }, + { + "cell_type": "markdown", + "id": "cell-6", + "metadata": {}, + "source": [ + "## Section 3: Build the Sparse Matrix\n", + "\n", + "The sparse matrix `X_sparse` has:\n", + "- **Rows**: Calibration targets (e.g., SNAP totals by geography)\n", + "- **Columns**: (household \u00d7 CD) pairs - each household appears once per CD\n", + "\n", + "We filter to SNAP targets only (stratum_group_id=4) for this demonstration." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cell-7", + "metadata": {}, + "outputs": [], + "source": [ + "sim = Microsimulation(dataset=dataset_path)\n", + "\n", + "builder = SparseMatrixBuilder(\n", + " db_uri,\n", + " time_period=2023,\n", + " cds_to_calibrate=test_cds,\n", + " dataset_path=dataset_path,\n", + ")\n", + "\n", + "targets_df, X_sparse, household_id_mapping = builder.build_matrix(\n", + " sim, target_filter={\"stratum_group_ids\": [4], \"variables\": [\"snap\"]}\n", + ")\n", + "\n", + "print(f\"X_sparse shape: {X_sparse.shape}\")\n", + "print(f\" Rows (targets): {X_sparse.shape[0]}\")\n", + "print(f\" Columns (household \u00d7 CD pairs): {X_sparse.shape[1]}\")\n", + "print(f\" Non-zero entries: {X_sparse.nnz:,}\")\n", + "print(f\" Sparsity: {1 - X_sparse.nnz / (X_sparse.shape[0] * X_sparse.shape[1]):.2%}\")" + ] + }, + { + "cell_type": "markdown", + "id": "cell-8", + "metadata": {}, + "source": [ + "## Section 4: Understanding the Matrix Structure with MatrixTracer\n", + "\n", + "The `MatrixTracer` helps navigate the sparse matrix by providing lookups between:\n", + "- Column indices \u2194 (household_id, CD) pairs\n", + "- Row indices \u2194 target definitions" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cell-9", + "metadata": {}, + "outputs": [], + "source": [ + "tracer = MatrixTracer(\n", + " targets_df, X_sparse, household_id_mapping, test_cds, sim\n", + ")\n", + "\n", + "tracer.print_matrix_structure()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cell-11", + "metadata": {}, + "outputs": [], + "source": [ + "target_groups, group_info = create_target_groups(targets_df)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7e75756b-a317-4800-bac5-e0fd6bc43b8c", + "metadata": {}, + "outputs": [], + "source": [ + "target_group = tracer.get_group_rows(2)\n", + "row_loc = target_group.iloc[28]['row_index'] # Manually found the index value 28\n", + "row_info = tracer.get_row_info(row_loc)\n", + "var = row_info['variable']\n", + "var_desc = row_info['variable_desc']\n", + "target_geo_id = int(row_info['geographic_id'])\n", + "\n", + "print(\"Row info for North Carolina's SNAP benefit amount:\")\n", + "print(row_info)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c2be9721-ff11-4f78-ba0b-03407201dd53", + "metadata": {}, + "outputs": [], + "source": [ + "hh_snap_df = pd.DataFrame(sim.calculate_dataframe([\n", + " \"household_id\", \"household_weight\", \"state_fips\", \"snap\"]) \n", + ")\n", + "print(hh_snap_df)" + ] + }, + { + "cell_type": "markdown", + "id": "438828ac-df94-4d3e-a9a8-227bb6f64933", + "metadata": {}, + "source": [ + "If we were to include `congressional_district_geoid` above, they would all be zeros. It's not until we do the calibration, i.e., come back with a vector of weights `w` to multiply `X_sparse` with, that we will set `congressional_district_geoid`.\n", + "\n", + "However, every household is already a donor to every contressional district. You can get the column positions for every household (remember targets are on the rows, donor households on the columns) by running tracer's get_household_column_positions with the *original* `household_id`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cell-12", + "metadata": {}, + "outputs": [], + "source": [ + "# Reverse lookup: get all column positions for a specific household\n", + "hh_id = hh_snap_df.loc[hh_snap_df.snap > 0].household_id.values[0]\n", + "print(hh_snap_df.loc[hh_snap_df.household_id == hh_id])\n", + "\n", + "print(\"\\nEvaluating the tracer.get_household_column_positions dictionary:\\n\")\n", + "positions = tracer.get_household_column_positions(hh_id)\n", + "print(positions)" + ] + }, + { + "cell_type": "markdown", + "id": "cell-13", + "metadata": {}, + "source": [ + "## Section 5: Understanding the cells of the X_Sparse matrix and Target vector" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e05aaeab-3786-4ff0-a50b-34577065d2e0", + "metadata": {}, + "outputs": [], + "source": [ + "print(\"Remember, this is a North Carolina target:\\n\")\n", + "print(targets_df.iloc[row_loc])\n", + "\n", + "print(\"\\nHousehold donated to NC's 2nd district, 2023 SNAP dollars:\")\n", + "print(X_sparse[row_loc, positions['3702']]) # Household donated to NC's 2nd district\n", + "\n", + "print(\"\\nHousehold donated to NC's 2nd district, 2023 SNAP dollars:\")\n", + "print(X_sparse[row_loc, positions['201']]) # Household donated to AK's at Large District" + ] + }, + { + "cell_type": "markdown", + "id": "cell-16", + "metadata": {}, + "source": [ + "Key property: For state-level targets, only CDs in that state should have non-zero values.\n", + "\n", + "Example: A NC state SNAP target should have zeros for HI, MT, and AK CD columns.\n", + "\n", + "So let's see that same household's value for the Alaska state target:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8cdc264c-8335-40eb-afd9-4c4d023ec303", + "metadata": {}, + "outputs": [], + "source": [ + "target_group = tracer.get_group_rows(2)\n", + "new_row_loc = target_group.iloc[10]['row_index'] # Manually found the index value 10\n", + "row_info = tracer.get_row_info(row_loc)\n", + "var = row_info['variable']\n", + "var_desc = row_info['variable_desc']\n", + "target_geo_id = int(row_info['geographic_id'])\n", + "\n", + "print(\"Row info for Alaska's SNAP benefit amount:\")\n", + "print(row_info)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ac59b6f1-859f-4246-8a05-8cb26384c882", + "metadata": {}, + "outputs": [], + "source": [ + "print(\"\\nHousehold donated to AK's 1st district, 2023 SNAP dollars:\")\n", + "print(X_sparse[new_row_loc, positions['201']]) # Household donated to AK's at Large District" + ] + }, + { + "cell_type": "markdown", + "id": "cell-18", + "metadata": {}, + "source": [ + "## Section 6: Simulating State-Swapped Calculations\n", + "\n", + "When a household is \"transplanted\" to a different state, state-dependent benefits like SNAP are recalculated under the destination state's rules." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cell-19", + "metadata": {}, + "outputs": [], + "source": [ + "def create_state_simulation(state_fips):\n", + " \"\"\"Create a simulation with all households assigned to a specific state.\"\"\"\n", + " s = Microsimulation(dataset=dataset_path)\n", + " s.set_input(\n", + " \"state_fips\", 2023, np.full(hh_snap_df.shape[0], state_fips, dtype=np.int32)\n", + " )\n", + " for var in get_calculated_variables(s):\n", + " s.delete_arrays(var)\n", + " return s\n", + "\n", + "# Compare SNAP for first 5 households under NC vs AK rules\n", + "nc_sim = create_state_simulation(37) # NC\n", + "ak_sim = create_state_simulation(2) # AK\n", + "\n", + "nc_snap = nc_sim.calculate(\"snap\", map_to=\"household\").values[:5]\n", + "ak_snap = ak_sim.calculate(\"snap\", map_to=\"household\").values[:5]\n", + "\n", + "print(\"SNAP values for first 5 households under different state rules:\")\n", + "print(f\" NC rules: {nc_snap}\")\n", + "print(f\" AK rules: {ak_snap}\")\n", + "print(f\" Difference: {ak_snap - nc_snap}\")" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.13.6" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} \ No newline at end of file diff --git a/docs/myst.yml b/docs/myst.yml index a39b3cfb..6b6ceae6 100644 --- a/docs/myst.yml +++ b/docs/myst.yml @@ -24,6 +24,7 @@ project: - file: background.md - file: data.md - file: methodology.md + - file: local_area_calibration_setup.ipynb - file: long_term_projections.ipynb - file: discussion.md - file: conclusion.md diff --git a/policyengine_us_data/datasets/cps/cps.py b/policyengine_us_data/datasets/cps/cps.py index f932e0d5..06619d6e 100644 --- a/policyengine_us_data/datasets/cps/cps.py +++ b/policyengine_us_data/datasets/cps/cps.py @@ -2117,10 +2117,14 @@ class Pooled_3_Year_CPS_2023(PooledCPS): url = "hf://policyengine/policyengine-us-data/pooled_3_year_cps_2023.h5" +local_area_calibration = os.environ.get("LOCAL_AREA_CALIBRATION") == "true" + if __name__ == "__main__": if test_lite: CPS_2024().generate() CPS_2025().generate() + elif local_area_calibration: + CPS_2023_Full().generate() else: CPS_2021().generate() CPS_2022().generate() diff --git a/policyengine_us_data/datasets/cps/extended_cps.py b/policyengine_us_data/datasets/cps/extended_cps.py index f28c726c..350aa642 100644 --- a/policyengine_us_data/datasets/cps/extended_cps.py +++ b/policyengine_us_data/datasets/cps/extended_cps.py @@ -320,6 +320,15 @@ def impute_income_variables( return result +class ExtendedCPS_2023(ExtendedCPS): + cps = CPS_2023_Full + puf = PUF_2023 + name = "extended_cps_2023" + label = "Extended CPS (2023)" + file_path = STORAGE_FOLDER / "extended_cps_2023.h5" + time_period = 2023 + + class ExtendedCPS_2024(ExtendedCPS): cps = CPS_2024 puf = PUF_2024 @@ -330,4 +339,11 @@ class ExtendedCPS_2024(ExtendedCPS): if __name__ == "__main__": - ExtendedCPS_2024().generate() + local_area_calibration = ( + os.environ.get("LOCAL_AREA_CALIBRATION", "").lower() == "true" + ) + + if local_area_calibration: + ExtendedCPS_2023().generate() + else: + ExtendedCPS_2024().generate() diff --git a/policyengine_us_data/datasets/cps/local_area_calibration/__init__.py b/policyengine_us_data/datasets/cps/local_area_calibration/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/policyengine_us_data/datasets/cps/local_area_calibration/calibration_utils.py b/policyengine_us_data/datasets/cps/local_area_calibration/calibration_utils.py new file mode 100644 index 00000000..8d685498 --- /dev/null +++ b/policyengine_us_data/datasets/cps/local_area_calibration/calibration_utils.py @@ -0,0 +1,185 @@ +""" +Shared utilities for calibration scripts. +""" + +from typing import List, Tuple +import numpy as np +import pandas as pd + + +def get_calculated_variables(sim) -> List[str]: + """ + Return variables that should be cleared for state-swap recalculation. + + Includes variables with formulas, adds, or subtracts. + + Excludes ID variables (person_id, household_id, etc.) because: + 1. They have formulas that generate sequential IDs (0, 1, 2, ...) + 2. We need the original H5 values, not regenerated sequences + 3. PolicyEngine's random() function uses entity IDs as seeds: + seed = abs(entity_id * 100 + count_random_calls) + If IDs change, random-dependent variables (SSI resource test, + WIC nutritional risk, WIC takeup) produce different results. + """ + exclude_ids = { + "person_id", + "household_id", + "tax_unit_id", + "spm_unit_id", + "family_id", + "marital_unit_id", + } + return [ + name + for name, var in sim.tax_benefit_system.variables.items() + if ( + var.formulas + or getattr(var, "adds", None) + or getattr(var, "subtracts", None) + ) + and name not in exclude_ids + ] + + +def apply_op(values: np.ndarray, op: str, val: str) -> np.ndarray: + """Apply constraint operation to values array.""" + try: + parsed = float(val) + if parsed.is_integer(): + parsed = int(parsed) + except ValueError: + if val == "True": + parsed = True + elif val == "False": + parsed = False + else: + parsed = val + + if op in ("==", "="): + return values == parsed + if op == ">": + return values > parsed + if op == ">=": + return values >= parsed + if op == "<": + return values < parsed + if op == "<=": + return values <= parsed + if op == "!=": + return values != parsed + return np.ones(len(values), dtype=bool) + + +def _get_geo_level(geo_id) -> int: + """Return geographic level: 0=National, 1=State, 2=District.""" + if geo_id == "US": + return 0 + try: + val = int(geo_id) + return 1 if val < 100 else 2 + except (ValueError, TypeError): + return 3 + + +def create_target_groups( + targets_df: pd.DataFrame, +) -> Tuple[np.ndarray, List[str]]: + """ + Automatically create target groups based on metadata. + + Grouping rules: + 1. Groups are ordered by geographic level: National -> State -> District + 2. Within each level, targets are grouped by variable type + 3. Each group contributes equally to the total loss + + Parameters + ---------- + targets_df : pd.DataFrame + DataFrame containing target metadata with columns: + - stratum_group_id: Identifier for the type of target + - geographic_id: Geographic identifier (US, state FIPS, CD GEOID) + - variable: Variable name + - value: Target value + + Returns + ------- + target_groups : np.ndarray + Array of group IDs for each target + group_info : List[str] + List of descriptive strings for each group + """ + target_groups = np.zeros(len(targets_df), dtype=int) + group_id = 0 + group_info = [] + processed_mask = np.zeros(len(targets_df), dtype=bool) + + print("\n=== Creating Target Groups ===") + + # Add geo_level column for sorting + targets_df = targets_df.copy() + targets_df["_geo_level"] = targets_df["geographic_id"].apply( + _get_geo_level + ) + + geo_level_names = {0: "National", 1: "State", 2: "District"} + + # Process by geographic level: National (0) -> State (1) -> District (2) + for level in [0, 1, 2]: + level_mask = targets_df["_geo_level"] == level + if not level_mask.any(): + continue + + level_name = geo_level_names.get(level, f"Level {level}") + print(f"\n{level_name} targets:") + + # Get unique variables at this level + level_df = targets_df[level_mask & ~processed_mask] + unique_vars = sorted(level_df["variable"].unique()) + + for var_name in unique_vars: + var_mask = ( + (targets_df["variable"] == var_name) + & level_mask + & ~processed_mask + ) + + if not var_mask.any(): + continue + + matching = targets_df[var_mask] + n_targets = var_mask.sum() + + # Assign group + target_groups[var_mask] = group_id + processed_mask |= var_mask + + # Create descriptive label + stratum_group = matching["stratum_group_id"].iloc[0] + if var_name == "household_count" and stratum_group == 4: + label = "SNAP Household Count" + elif var_name == "snap": + label = "Snap" + else: + label = var_name.replace("_", " ").title() + + # Format output based on level and count + if n_targets == 1: + value = matching["value"].iloc[0] + info_str = ( + f"{level_name} {label} (1 target, value={value:,.0f})" + ) + print_str = f" Group {group_id}: {label} = {value:,.0f}" + else: + info_str = f"{level_name} {label} ({n_targets} targets)" + print_str = ( + f" Group {group_id}: {label} ({n_targets} targets)" + ) + + group_info.append(f"Group {group_id}: {info_str}") + print(print_str) + group_id += 1 + + print(f"\nTotal groups created: {group_id}") + print("=" * 40) + + return target_groups, group_info diff --git a/policyengine_us_data/datasets/cps/local_area_calibration/create_stratified_cps.py b/policyengine_us_data/datasets/cps/local_area_calibration/create_stratified_cps.py new file mode 100644 index 00000000..8dccf34a --- /dev/null +++ b/policyengine_us_data/datasets/cps/local_area_calibration/create_stratified_cps.py @@ -0,0 +1,316 @@ +""" +Create a stratified sample of extended_cps_2023.h5 that preserves high-income households. +This is needed for congressional district geo-stacking where the full dataset is too large. + +Strategy: +- Keep ALL households above a high income threshold (e.g., top 1%) +- Sample progressively less from lower income strata +- Ensure representation across all income levels +""" + +import numpy as np +import pandas as pd +import h5py +from policyengine_us import Microsimulation +from policyengine_core.data.dataset import Dataset +from policyengine_core.enums import Enum + + +def create_stratified_cps_dataset( + target_households=30_000, + high_income_percentile=99, # Keep ALL households above this percentile + base_dataset=None, + output_path=None, +): + """ + Create a stratified sample of CPS data preserving high-income households. + + Args: + target_households: Target number of households in output (approximate) + high_income_percentile: Keep ALL households above this AGI percentile + output_path: Where to save the stratified h5 file + """ + print("\n" + "=" * 70) + print("CREATING STRATIFIED CPS DATASET") + print("=" * 70) + + # Default to local storage if no base_dataset specified + if base_dataset is None: + from policyengine_us_data.storage import STORAGE_FOLDER + + base_dataset = str(STORAGE_FOLDER / "extended_cps_2023.h5") + + # Load the original simulation + print("Loading original dataset...") + sim = Microsimulation(dataset=base_dataset) + + # Calculate AGI for all households + print("Calculating household AGI...") + agi = sim.calculate("adjusted_gross_income", map_to="household").values + household_ids = sim.calculate("household_id", map_to="household").values + n_households_orig = len(household_ids) + + print(f"Original dataset: {n_households_orig:,} households") + print(f"Target dataset: {target_households:,} households") + print(f"Reduction ratio: {target_households/n_households_orig:.1%}") + + # Calculate AGI percentiles + print("\nAnalyzing income distribution...") + percentiles = [0, 25, 50, 75, 90, 95, 99, 99.5, 99.9, 100] + agi_percentiles = np.percentile(agi, percentiles) + + print("AGI Percentiles:") + for p, val in zip(percentiles, agi_percentiles): + print(f" {p:5.1f}%: ${val:,.0f}") + + # Define sampling strategy + # Keep ALL high earners, sample progressively less from lower strata + high_income_threshold = np.percentile(agi, high_income_percentile) + print( + f"\nHigh-income threshold (top {100-high_income_percentile}%): ${high_income_threshold:,.0f}" + ) + + # Create strata with sampling rates + strata = [ + (99.9, 100, 1.00), # Top 0.1% - keep ALL + (99.5, 99.9, 1.00), # 99.5-99.9% - keep ALL + (99, 99.5, 1.00), # 99-99.5% - keep ALL + (95, 99, 0.80), # 95-99% - keep 80% + (90, 95, 0.60), # 90-95% - keep 60% + (75, 90, 0.40), # 75-90% - keep 40% + (50, 75, 0.25), # 50-75% - keep 25% + (25, 50, 0.15), # 25-50% - keep 15% + (0, 25, 0.10), # Bottom 25% - keep 10% + ] + + # Adjust sampling rates to hit target + print("\nInitial sampling strategy:") + expected_count = 0 + for low_p, high_p, rate in strata: + low_val = np.percentile(agi, low_p) if low_p > 0 else -np.inf + high_val = np.percentile(agi, high_p) if high_p < 100 else np.inf + in_stratum = np.sum((agi > low_val) & (agi <= high_val)) + expected = int(in_stratum * rate) + expected_count += expected + print( + f" {low_p:5.1f}-{high_p:5.1f}%: {in_stratum:6,} households x {rate:.0%} = {expected:6,}" + ) + + print(f"Expected total: {expected_count:,} households") + + # Adjust rates if needed + if expected_count > target_households * 1.1: # Allow 10% overage + adjustment = target_households / expected_count + print( + f"\nAdjusting rates by factor of {adjustment:.2f} to meet target..." + ) + + # Never reduce the top percentiles + strata_adjusted = [] + for low_p, high_p, rate in strata: + if high_p >= 99: # Never reduce top 1% + strata_adjusted.append((low_p, high_p, rate)) + else: + strata_adjusted.append( + (low_p, high_p, min(1.0, rate * adjustment)) + ) + strata = strata_adjusted + + # Select households based on strata + print("\nSelecting households...") + selected_mask = np.zeros(n_households_orig, dtype=bool) + + for low_p, high_p, rate in strata: + low_val = np.percentile(agi, low_p) if low_p > 0 else -np.inf + high_val = np.percentile(agi, high_p) if high_p < 100 else np.inf + + in_stratum = (agi > low_val) & (agi <= high_val) + stratum_indices = np.where(in_stratum)[0] + n_in_stratum = len(stratum_indices) + + if rate >= 1.0: + # Keep all + selected_mask[stratum_indices] = True + n_selected = n_in_stratum + else: + # Random sample within stratum + n_to_select = int(n_in_stratum * rate) + if n_to_select > 0: + np.random.seed(42) # For reproducibility + selected_indices = np.random.choice( + stratum_indices, n_to_select, replace=False + ) + selected_mask[selected_indices] = True + n_selected = n_to_select + else: + n_selected = 0 + + print( + f" {low_p:5.1f}-{high_p:5.1f}%: Selected {n_selected:6,} / {n_in_stratum:6,} ({n_selected/max(1,n_in_stratum):.0%})" + ) + + n_selected = np.sum(selected_mask) + print( + f"\nTotal selected: {n_selected:,} households ({n_selected/n_households_orig:.1%} of original)" + ) + + # Verify high earners are preserved + high_earners_mask = agi >= high_income_threshold + n_high_earners = np.sum(high_earners_mask) + n_high_earners_selected = np.sum(selected_mask & high_earners_mask) + print(f"\nHigh earners (>=${high_income_threshold:,.0f}):") + print(f" Original: {n_high_earners:,}") + print( + f" Selected: {n_high_earners_selected:,} ({n_high_earners_selected/n_high_earners:.0%})" + ) + + # Get the selected household IDs + selected_household_ids = set(household_ids[selected_mask]) + + # Now filter the dataset using DataFrame approach (similar to create_sparse_state_stacked.py) + print("\nCreating filtered dataset...") + time_period = int(sim.default_calculation_period) + + # Convert full simulation to DataFrame + df = sim.to_input_dataframe() + + # Filter to selected households + hh_id_col = f"household_id__{time_period}" + df_filtered = df[df[hh_id_col].isin(selected_household_ids)].copy() + + print(f"Filtered DataFrame: {len(df_filtered):,} persons") + + # Create Dataset from filtered DataFrame + print("Creating Dataset from filtered DataFrame...") + stratified_dataset = Dataset.from_dataframe(df_filtered, time_period) + + # Build a simulation to convert to h5 + print("Building simulation from Dataset...") + stratified_sim = Microsimulation() + stratified_sim.dataset = stratified_dataset + stratified_sim.build_from_dataset() + + # Generate output path if not provided + if output_path is None: + from policyengine_us_data.storage import STORAGE_FOLDER + + output_path = str(STORAGE_FOLDER / "stratified_extended_cps_2023.h5") + + # Save to h5 file + print(f"\nSaving to {output_path}...") + data = {} + + # Only save input variables (not calculated/derived variables) + input_vars = set(stratified_sim.input_variables) + print( + f"Found {len(input_vars)} input variables (excluding calculated variables)" + ) + + for variable in stratified_sim.tax_benefit_system.variables: + if variable not in input_vars: + continue + + data[variable] = {} + for period in stratified_sim.get_holder(variable).get_known_periods(): + values = stratified_sim.get_holder(variable).get_array(period) + + # Handle different value types + if variable == "county_fips": + values = values.astype("int32") + elif stratified_sim.tax_benefit_system.variables.get( + variable + ).value_type in (Enum, str): + # Check if it's an EnumArray with decode_to_str method + if hasattr(values, "decode_to_str"): + values = values.decode_to_str().astype("S") + else: + # Already a numpy array, just ensure it's string type + values = values.astype("S") + else: + values = np.array(values) + + if values is not None: + data[variable][period] = values + + if len(data[variable]) == 0: + del data[variable] + + # Write to h5 + with h5py.File(output_path, "w") as f: + for variable, periods in data.items(): + grp = f.create_group(variable) + for period, values in periods.items(): + grp.create_dataset(str(period), data=values) + + print(f"Stratified CPS dataset saved successfully!") + + # Verify the saved file + print("\nVerifying saved file...") + with h5py.File(output_path, "r") as f: + if "household_id" in f and str(time_period) in f["household_id"]: + hh_ids = f["household_id"][str(time_period)][:] + print(f" Final households: {len(hh_ids):,}") + if "person_id" in f and str(time_period) in f["person_id"]: + person_ids = f["person_id"][str(time_period)][:] + print(f" Final persons: {len(person_ids):,}") + if ( + "household_weight" in f + and str(time_period) in f["household_weight"] + ): + weights = f["household_weight"][str(time_period)][:] + print(f" Final household weights sum: {np.sum(weights):,.0f}") + + # Final income distribution check + print("\nVerifying income distribution in stratified dataset...") + stratified_sim_verify = Microsimulation(dataset=output_path) + agi_stratified = stratified_sim_verify.calculate( + "adjusted_gross_income", map_to="household" + ).values + + print("AGI Percentiles in stratified dataset:") + for p in [0, 25, 50, 75, 90, 95, 99, 99.5, 99.9, 100]: + val = np.percentile(agi_stratified, p) + print(f" {p:5.1f}%: ${val:,.0f}") + + max_agi_original = np.max(agi) + max_agi_stratified = np.max(agi_stratified) + print(f"\nMaximum AGI:") + print(f" Original: ${max_agi_original:,.0f}") + print(f" Stratified: ${max_agi_stratified:,.0f}") + + if max_agi_stratified < max_agi_original * 0.9: + print("WARNING: May have lost some ultra-high earners!") + else: + print("Ultra-high earners preserved!") + + return output_path + + +if __name__ == "__main__": + import sys + + # Parse command line arguments + if len(sys.argv) > 1: + try: + target = int(sys.argv[1]) + print( + f"Creating stratified dataset with target of {target:,} households..." + ) + output_file = create_stratified_cps_dataset( + target_households=target + ) + except ValueError: + print(f"Invalid target households: {sys.argv[1]}") + print("Usage: python create_stratified_cps.py [target_households]") + sys.exit(1) + else: + # Default target + print( + "Creating stratified dataset with default target of 30,000 households..." + ) + output_file = create_stratified_cps_dataset(target_households=30_000) + + print(f"\nDone! Created: {output_file}") + print("\nTo test loading:") + print(" from policyengine_us import Microsimulation") + print(f" sim = Microsimulation(dataset='{output_file}')") diff --git a/policyengine_us_data/datasets/cps/local_area_calibration/matrix_tracer.py b/policyengine_us_data/datasets/cps/local_area_calibration/matrix_tracer.py new file mode 100644 index 00000000..e7cbf57b --- /dev/null +++ b/policyengine_us_data/datasets/cps/local_area_calibration/matrix_tracer.py @@ -0,0 +1,383 @@ +""" +Matrix tracer utility for debugging geo-stacking sparse matrices. + +This utility allows tracing through the complex stacked matrix structure +to verify values match simulation results. + +USAGE +===== + +Basic Setup: + + from matrix_tracer import MatrixTracer + + tracer = MatrixTracer( + targets_df, X_sparse, household_id_mapping, + cds_to_calibrate, sim + ) + +Common Operations: + + # 1. Understand what a column represents + col_info = tracer.get_column_info(100) + + # 2. Find where a household appears across all CDs + positions = tracer.get_household_column_positions(565) + + # 3. View matrix structure + tracer.print_matrix_structure() + +Matrix Structure: + + Columns are organized as: [CD1_households | CD2_households | ... | CD436_households] + Each CD block has n_households columns (e.g., 10,580 households) + + Formula to find column index: + column_idx = cd_block_number * n_households + household_index +""" + +import logging +import pandas as pd +import numpy as np +from typing import Dict, List +from scipy import sparse + +from policyengine_us_data.datasets.cps.local_area_calibration.calibration_utils import ( + create_target_groups, +) + + +logger = logging.getLogger(__name__) + + +class MatrixTracer: + """Trace through geo-stacked sparse matrices for debugging.""" + + def __init__( + self, + targets_df: pd.DataFrame, + matrix: sparse.csr_matrix, + household_id_mapping: Dict[str, List[str]], + geographic_ids: List[str], + sim, + ): + """ + Initialize tracer with matrix components. + + Args: + targets_df: DataFrame of all targets + matrix: The final stacked sparse matrix + household_id_mapping: Mapping from geo keys to household ID lists + geographic_ids: List of geographic IDs in order + sim: Microsimulation instance + """ + self.targets_df = targets_df + self.matrix = matrix + self.household_id_mapping = household_id_mapping + self.geographic_ids = geographic_ids + self.sim = sim + + # Get original household info + self.original_household_ids = sim.calculate("household_id").values + self.n_households = len(self.original_household_ids) + self.n_geographies = len(geographic_ids) + + # Build reverse lookup: original_hh_id -> index in original data + self.hh_id_to_index = { + hh_id: idx for idx, hh_id in enumerate(self.original_household_ids) + } + + # Build column catalog: maps column index -> (cd_geoid, household_id, household_index) + self.column_catalog = self._build_column_catalog() + + # Build row catalog: maps row index -> target info + self.row_catalog = self._build_row_catalog() + + logger.info( + f"Tracer initialized: {self.n_households} households x {self.n_geographies} geographies" + ) + logger.info(f"Matrix shape: {matrix.shape}") + + def _build_column_catalog(self) -> pd.DataFrame: + """Build a complete catalog of all matrix columns.""" + catalog = [] + col_idx = 0 + + for geo_id in self.geographic_ids: + for hh_idx, hh_id in enumerate(self.original_household_ids): + catalog.append( + { + "column_index": col_idx, + "cd_geoid": geo_id, + "household_id": hh_id, + "household_index": hh_idx, + } + ) + col_idx += 1 + + return pd.DataFrame(catalog) + + def _build_row_catalog(self) -> pd.DataFrame: + """Build a complete catalog of all matrix rows (targets).""" + catalog = [] + + for row_idx, (_, target) in enumerate(self.targets_df.iterrows()): + var_name = target["variable"] + var_desc = "" + if var_name in self.sim.tax_benefit_system.variables: + var_obj = self.sim.tax_benefit_system.variables[var_name] + var_desc = getattr(var_obj, "label", var_name) + + catalog.append( + { + "row_index": row_idx, + "variable": var_name, + "variable_desc": var_desc, + "geographic_id": target.get("geographic_id", "unknown"), + "target_value": target["value"], + "stratum_id": target.get("stratum_id"), + "stratum_group_id": target.get( + "stratum_group_id", "unknown" + ), + } + ) + + return pd.DataFrame(catalog) + + def get_column_info(self, col_idx: int) -> Dict: + """Get information about a specific column.""" + if col_idx >= len(self.column_catalog): + raise ValueError( + f"Column index {col_idx} out of range (max: {len(self.column_catalog)-1})" + ) + return self.column_catalog.iloc[col_idx].to_dict() + + def get_row_info(self, row_idx: int) -> Dict: + """Get information about a specific row (target).""" + if row_idx >= len(self.row_catalog): + raise ValueError( + f"Row index {row_idx} out of range (max: {len(self.row_catalog)-1})" + ) + return self.row_catalog.iloc[row_idx].to_dict() + + def lookup_matrix_cell(self, row_idx: int, col_idx: int) -> Dict: + """ + Look up a specific matrix cell and return complete context. + + Args: + row_idx: Row index in matrix + col_idx: Column index in matrix + + Returns: + Dict with row info, column info, and matrix value + """ + row_info = self.get_row_info(row_idx) + col_info = self.get_column_info(col_idx) + matrix_value = self.matrix[row_idx, col_idx] + + return { + "row_index": row_idx, + "column_index": col_idx, + "matrix_value": float(matrix_value), + "target": row_info, + "household": col_info, + } + + def get_household_column_positions( + self, original_hh_id: int + ) -> Dict[str, int]: + """ + Get all column positions for a household across all geographies. + + Args: + original_hh_id: Original household ID from simulation + + Returns: + Dict mapping geo_id to column position in stacked matrix + """ + if original_hh_id not in self.hh_id_to_index: + raise ValueError( + f"Household {original_hh_id} not found in original data" + ) + + # Get the household's index in the original data + hh_index = self.hh_id_to_index[original_hh_id] + + # Calculate column positions for each geography + positions = {} + for geo_idx, geo_id in enumerate(self.geographic_ids): + # Each geography gets a block of n_households columns + col_position = geo_idx * self.n_households + hh_index + positions[geo_id] = col_position + + return positions + + def print_matrix_structure(self, show_groups=True): + """Print a comprehensive breakdown of the matrix structure.""" + print("\n" + "=" * 80) + print("MATRIX STRUCTURE BREAKDOWN") + print("=" * 80) + + print( + f"\nMatrix dimensions: {self.matrix.shape[0]} rows x " + f"{self.matrix.shape[1]} columns" + ) + print(f" Rows = {len(self.row_catalog)} targets") + print( + f" Columns = {self.n_households} households x " + f"{self.n_geographies} CDs" + ) + print( + f" = {self.n_households:,} x {self.n_geographies} " + f"= {self.matrix.shape[1]:,}" + ) + + print("\n" + "-" * 80) + print("COLUMN STRUCTURE (Households stacked by CD)") + print("-" * 80) + + # Build column ranges by CD + col_ranges = [] + cumulative = 0 + for geo_id in self.geographic_ids: + start_col = cumulative + end_col = cumulative + self.n_households - 1 + col_ranges.append( + { + "cd_geoid": geo_id, + "start_col": start_col, + "end_col": end_col, + "n_households": self.n_households, + } + ) + cumulative += self.n_households + + ranges_df = pd.DataFrame(col_ranges) + print(f"\nShowing first and last 5 CDs of {len(ranges_df)} total:") + print("\nFirst 5 CDs:") + print(ranges_df.head(5).to_string(index=False)) + print("\nLast 5 CDs:") + print(ranges_df.tail(5).to_string(index=False)) + + print("\n" + "-" * 80) + print("ROW STRUCTURE (Targets)") + print("-" * 80) + + print(f"\nTotal targets: {len(self.row_catalog)}") + + # Summarize by geographic level if column exists + if "geographic_level" in self.row_catalog.columns: + print("\nTargets by geographic level:") + geo_level_summary = ( + self.row_catalog.groupby("geographic_level") + .size() + .reset_index(name="n_targets") + ) + print(geo_level_summary.to_string(index=False)) + + print("\nTargets by stratum group:") + stratum_summary = ( + self.row_catalog.groupby("stratum_group_id") + .agg({"row_index": "count", "variable": lambda x: len(set(x))}) + .rename( + columns={"row_index": "n_targets", "variable": "n_unique_vars"} + ) + ) + print(stratum_summary.to_string()) + + # Create and display target groups with row indices + if show_groups: + print("\n" + "-" * 80) + print("TARGET GROUPS (for loss calculation)") + print("-" * 80) + + target_groups, group_info = create_target_groups(self.targets_df) + + # Store for later use + self.target_groups = target_groups + + # Print each group with row indices + for group_id, info in enumerate(group_info): + group_mask = target_groups == group_id + row_indices = np.where(group_mask)[0] + + # Format row indices for display + if len(row_indices) > 6: + row_display = ( + f"[{row_indices[0]}, {row_indices[1]}, " + f"{row_indices[2]}, ..., {row_indices[-2]}, " + f"{row_indices[-1]}]" + ) + else: + row_display = str(row_indices.tolist()) + + print(f" {info} - rows {row_display}") + + print("\n" + "=" * 80) + + def print_column_catalog(self, max_rows: int = 50): + """Print a sample of the column catalog.""" + print( + f"\nColumn Catalog (showing first {max_rows} of {len(self.column_catalog)}):" + ) + print(self.column_catalog.head(max_rows).to_string(index=False)) + + def print_row_catalog(self, max_rows: int = 50): + """Print a sample of the row catalog.""" + print( + f"\nRow Catalog (showing first {max_rows} of {len(self.row_catalog)}):" + ) + print(self.row_catalog.head(max_rows).to_string(index=False)) + + def get_group_rows(self, group_id: int) -> pd.DataFrame: + """ + Get all rows belonging to a specific target group. + + Args: + group_id: The group ID to filter by + + Returns: + DataFrame of row catalog entries for this group + """ + if not hasattr(self, "target_groups"): + self.target_groups, self.group_info = create_target_groups( + self.targets_df + ) + + group_mask = self.target_groups == group_id + return self.row_catalog[group_mask].copy() + + def trace_household_targets(self, original_hh_id: int) -> pd.DataFrame: + """ + Extract all target values for a household across all geographies. + + Args: + original_hh_id: Original household ID to trace + + Returns: + DataFrame with target details and values for this household + """ + positions = self.get_household_column_positions(original_hh_id) + + results = [] + + for target_idx, (_, target) in enumerate(self.targets_df.iterrows()): + target_result = { + "target_idx": target_idx, + "variable": target["variable"], + "target_value": target["value"], + "geographic_id": target.get("geographic_id", "unknown"), + "stratum_group_id": target.get("stratum_group_id", "unknown"), + } + + # Extract values for this target across all geographies + for geo_id, col_pos in positions.items(): + if col_pos < self.matrix.shape[1]: + matrix_value = self.matrix[target_idx, col_pos] + target_result[f"matrix_value_{geo_id}"] = matrix_value + else: + target_result[f"matrix_value_{geo_id}"] = np.nan + + results.append(target_result) + + return pd.DataFrame(results) diff --git a/policyengine_us_data/datasets/cps/local_area_calibration/sparse_matrix_builder.py b/policyengine_us_data/datasets/cps/local_area_calibration/sparse_matrix_builder.py new file mode 100644 index 00000000..3af0a8d8 --- /dev/null +++ b/policyengine_us_data/datasets/cps/local_area_calibration/sparse_matrix_builder.py @@ -0,0 +1,238 @@ +""" +Sparse matrix builder for geo-stacking calibration. + +Generic, database-driven approach where all constraints (including geographic) +are evaluated as masks. Geographic constraints work because we SET state_fips +before evaluating constraints. +""" + +import logging +from collections import defaultdict +from typing import Dict, List, Optional, Tuple +import numpy as np +import pandas as pd +from scipy import sparse +from sqlalchemy import create_engine, text + +logger = logging.getLogger(__name__) + +from policyengine_us_data.datasets.cps.local_area_calibration.calibration_utils import ( + get_calculated_variables, + apply_op, + _get_geo_level, +) + + +class SparseMatrixBuilder: + """Build sparse calibration matrices for geo-stacking.""" + + def __init__( + self, + db_uri: str, + time_period: int, + cds_to_calibrate: List[str], + dataset_path: Optional[str] = None, + ): + self.db_uri = db_uri + self.engine = create_engine(db_uri) + self.time_period = time_period + self.cds_to_calibrate = cds_to_calibrate + self.dataset_path = dataset_path + + def _query_targets(self, target_filter: dict) -> pd.DataFrame: + """Query targets based on filter criteria using OR logic.""" + or_conditions = [] + + if "stratum_group_ids" in target_filter: + ids = ",".join(map(str, target_filter["stratum_group_ids"])) + or_conditions.append(f"s.stratum_group_id IN ({ids})") + + if "variables" in target_filter: + vars_str = ",".join(f"'{v}'" for v in target_filter["variables"]) + or_conditions.append(f"t.variable IN ({vars_str})") + + if "target_ids" in target_filter: + ids = ",".join(map(str, target_filter["target_ids"])) + or_conditions.append(f"t.target_id IN ({ids})") + + if "stratum_ids" in target_filter: + ids = ",".join(map(str, target_filter["stratum_ids"])) + or_conditions.append(f"t.stratum_id IN ({ids})") + + if not or_conditions: + raise ValueError( + "target_filter must specify at least one filter criterion" + ) + + where_clause = " OR ".join(f"({c})" for c in or_conditions) + + query = f""" + SELECT t.target_id, t.stratum_id, t.variable, t.value, t.period, + s.stratum_group_id + FROM targets t + JOIN strata s ON t.stratum_id = s.stratum_id + WHERE {where_clause} + ORDER BY t.target_id + """ + + with self.engine.connect() as conn: + return pd.read_sql(query, conn) + + def _get_constraints(self, stratum_id: int) -> List[dict]: + """Get all constraints for a stratum (including geographic).""" + query = """ + SELECT constraint_variable as variable, operation, value + FROM stratum_constraints + WHERE stratum_id = :stratum_id + """ + with self.engine.connect() as conn: + df = pd.read_sql(query, conn, params={"stratum_id": stratum_id}) + return df.to_dict("records") + + def _get_geographic_id(self, stratum_id: int) -> str: + """Extract geographic_id from constraints for targets_df.""" + constraints = self._get_constraints(stratum_id) + for c in constraints: + if c["variable"] == "state_fips": + return c["value"] + if c["variable"] == "congressional_district_geoid": + return c["value"] + return "US" + + def _create_state_sim(self, state: int, n_households: int): + """Create a fresh simulation with state_fips set to given state.""" + from policyengine_us import Microsimulation + + state_sim = Microsimulation(dataset=self.dataset_path) + state_sim.set_input( + "state_fips", + self.time_period, + np.full(n_households, state, dtype=np.int32), + ) + for var in get_calculated_variables(state_sim): + state_sim.delete_arrays(var) + return state_sim + + def build_matrix( + self, sim, target_filter: dict + ) -> Tuple[pd.DataFrame, sparse.csr_matrix, Dict[str, List[str]]]: + """ + Build sparse calibration matrix. + + Args: + sim: Microsimulation instance (used for household_ids, or as template) + target_filter: Dict specifying which targets to include + - {"stratum_group_ids": [4]} for SNAP targets + - {"target_ids": [123, 456]} for specific targets + + Returns: + Tuple of (targets_df, X_sparse, household_id_mapping) + """ + household_ids = sim.calculate( + "household_id", map_to="household" + ).values + n_households = len(household_ids) + n_cds = len(self.cds_to_calibrate) + n_cols = n_households * n_cds + + targets_df = self._query_targets(target_filter) + n_targets = len(targets_df) + + if n_targets == 0: + raise ValueError("No targets found matching filter") + + targets_df["geographic_id"] = targets_df["stratum_id"].apply( + self._get_geographic_id + ) + + # Sort by (geo_level, variable, geographic_id) for contiguous group rows + targets_df["_geo_level"] = targets_df["geographic_id"].apply( + _get_geo_level + ) + targets_df = targets_df.sort_values( + ["_geo_level", "variable", "geographic_id"] + ) + targets_df = targets_df.drop(columns=["_geo_level"]).reset_index( + drop=True + ) + + X = sparse.lil_matrix((n_targets, n_cols), dtype=np.float32) + + # Group CDs by state. CD GEOIDs follow format SSCCC where SS is state + # FIPS (2 digits) and CCC is CD number (2-3 digits), so state = CD // 100 + cds_by_state = defaultdict(list) + for cd_idx, cd in enumerate(self.cds_to_calibrate): + state = int(cd) // 100 + cds_by_state[state].append((cd_idx, cd)) + + for state, cd_list in cds_by_state.items(): + if self.dataset_path: + state_sim = self._create_state_sim(state, n_households) + else: + state_sim = sim + state_sim.set_input( + "state_fips", + self.time_period, + np.full(n_households, state, dtype=np.int32), + ) + for var in get_calculated_variables(state_sim): + state_sim.delete_arrays(var) + + for cd_idx, cd in cd_list: + col_start = cd_idx * n_households + + for row_idx, (_, target) in enumerate(targets_df.iterrows()): + constraints = self._get_constraints(target["stratum_id"]) + + mask = np.ones(n_households, dtype=bool) + for c in constraints: + if c["variable"] == "congressional_district_geoid": + if ( + c["operation"] in ("==", "=") + and c["value"] != cd + ): + mask[:] = False + elif c["variable"] == "state_fips": + if ( + c["operation"] in ("==", "=") + and int(c["value"]) != state + ): + mask[:] = False + else: + try: + values = state_sim.calculate( + c["variable"], map_to="household" + ).values + mask &= apply_op( + values, c["operation"], c["value"] + ) + except Exception as e: + # Variable may not exist or may not be + # calculable at household level - skip + logger.debug( + f"Could not evaluate constraint " + f"{c['variable']}: {e}" + ) + + if not mask.any(): + continue + + target_values = state_sim.calculate( + target["variable"], map_to="household" + ).values + masked_values = (target_values * mask).astype(np.float32) + + nonzero = np.where(masked_values != 0)[0] + if len(nonzero) > 0: + X[row_idx, col_start + nonzero] = masked_values[ + nonzero + ] + + household_id_mapping = {} + for cd in self.cds_to_calibrate: + key = f"cd{cd}" + household_id_mapping[key] = [ + f"{hh_id}_{key}" for hh_id in household_ids + ] + + return targets_df, X.tocsr(), household_id_mapping diff --git a/policyengine_us_data/datasets/cps/small_enhanced_cps.py b/policyengine_us_data/datasets/cps/small_enhanced_cps.py index 83a5eba0..a9679a2a 100644 --- a/policyengine_us_data/datasets/cps/small_enhanced_cps.py +++ b/policyengine_us_data/datasets/cps/small_enhanced_cps.py @@ -22,11 +22,15 @@ def create_small_ecps(): data[variable] = {} for time_period in simulation.get_holder(variable).get_known_periods(): values = simulation.get_holder(variable).get_array(time_period) - values = np.array(values) if simulation.tax_benefit_system.variables.get( variable ).value_type in (Enum, str): - values = values.astype("S") + if hasattr(values, "decode_to_str"): + values = values.decode_to_str().astype("S") + else: + values = values.astype("S") + else: + values = np.array(values) if values is not None: data[variable][time_period] = values diff --git a/policyengine_us_data/datasets/puf/puf.py b/policyengine_us_data/datasets/puf/puf.py index cac9ad61..9d605aca 100644 --- a/policyengine_us_data/datasets/puf/puf.py +++ b/policyengine_us_data/datasets/puf/puf.py @@ -732,6 +732,13 @@ class PUF_2021(PUF): url = "release://policyengine/irs-soi-puf/1.8.0/puf_2021.h5" +class PUF_2023(PUF): + label = "PUF 2023" + name = "puf_2023" + time_period = 2023 + file_path = STORAGE_FOLDER / "puf_2023.h5" + + class PUF_2024(PUF): label = "PUF 2024 (2015-based)" name = "puf_2024" @@ -748,6 +755,13 @@ class PUF_2024(PUF): } if __name__ == "__main__": - PUF_2015().generate() - PUF_2021().generate() - PUF_2024().generate() + import os + + local_area_calibration = os.environ.get("LOCAL_AREA_CALIBRATION") == "true" + + if local_area_calibration: + PUF_2023().generate() + else: + PUF_2015().generate() + PUF_2021().generate() + PUF_2024().generate() diff --git a/policyengine_us_data/storage/download_private_prerequisites.py b/policyengine_us_data/storage/download_private_prerequisites.py index 26696d6c..3e080274 100644 --- a/policyengine_us_data/storage/download_private_prerequisites.py +++ b/policyengine_us_data/storage/download_private_prerequisites.py @@ -27,3 +27,9 @@ local_folder=FOLDER, version=None, ) +download( + repo="policyengine/policyengine-us-data", + repo_filename="policy_data.db", + local_folder=FOLDER, + version=None, +) diff --git a/policyengine_us_data/tests/test_local_area_calibration/__init__.py b/policyengine_us_data/tests/test_local_area_calibration/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/policyengine_us_data/tests/test_local_area_calibration/conftest.py b/policyengine_us_data/tests/test_local_area_calibration/conftest.py new file mode 100644 index 00000000..542036b4 --- /dev/null +++ b/policyengine_us_data/tests/test_local_area_calibration/conftest.py @@ -0,0 +1,119 @@ +"""Shared fixtures for local area calibration tests.""" + +import pytest +import numpy as np +from sqlalchemy import create_engine, text + +from policyengine_us import Microsimulation +from policyengine_us_data.storage import STORAGE_FOLDER +from policyengine_us_data.datasets.cps.local_area_calibration.sparse_matrix_builder import ( + SparseMatrixBuilder, +) +from policyengine_us_data.datasets.cps.local_area_calibration.matrix_tracer import ( + MatrixTracer, +) +from policyengine_us_data.datasets.cps.local_area_calibration.calibration_utils import ( + get_calculated_variables, +) + + +@pytest.fixture(scope="module") +def db_uri(): + db_path = STORAGE_FOLDER / "policy_data.db" + return f"sqlite:///{db_path}" + + +@pytest.fixture(scope="module") +def dataset_path(): + return str(STORAGE_FOLDER / "stratified_extended_cps_2023.h5") + + +@pytest.fixture(scope="module") +def test_cds(db_uri): + """CDs from NC, HI, MT, AK (manageable size, multiple same-state CDs).""" + engine = create_engine(db_uri) + query = """ + SELECT DISTINCT sc.value as cd_geoid + FROM strata s + JOIN stratum_constraints sc ON s.stratum_id = sc.stratum_id + WHERE s.stratum_group_id = 1 + AND sc.constraint_variable = 'congressional_district_geoid' + AND ( + sc.value LIKE '37__' + OR sc.value LIKE '150_' + OR sc.value LIKE '300_' + OR sc.value = '200' OR sc.value = '201' + ) + ORDER BY sc.value + """ + with engine.connect() as conn: + result = conn.execute(text(query)).fetchall() + return [row[0] for row in result] + + +@pytest.fixture(scope="module") +def sim(dataset_path): + return Microsimulation(dataset=dataset_path) + + +@pytest.fixture(scope="module") +def matrix_data(db_uri, dataset_path, test_cds, sim): + """Build sparse matrix, return (targets_df, X_sparse, household_id_mapping).""" + builder = SparseMatrixBuilder( + db_uri, + time_period=2023, + cds_to_calibrate=test_cds, + dataset_path=dataset_path, + ) + targets_df, X_sparse, household_id_mapping = builder.build_matrix( + sim, target_filter={"stratum_group_ids": [4], "variables": ["snap"]} + ) + return targets_df, X_sparse, household_id_mapping + + +@pytest.fixture(scope="module") +def targets_df(matrix_data): + return matrix_data[0] + + +@pytest.fixture(scope="module") +def X_sparse(matrix_data): + return matrix_data[1] + + +@pytest.fixture(scope="module") +def household_id_mapping(matrix_data): + return matrix_data[2] + + +@pytest.fixture(scope="module") +def tracer(targets_df, X_sparse, household_id_mapping, test_cds, sim): + return MatrixTracer( + targets_df, X_sparse, household_id_mapping, test_cds, sim + ) + + +@pytest.fixture(scope="module") +def n_households(tracer): + return tracer.n_households + + +@pytest.fixture(scope="module") +def household_ids(tracer): + return tracer.original_household_ids + + +@pytest.fixture(scope="module") +def household_states(sim): + return sim.calculate("state_fips", map_to="household").values + + +def create_state_simulation(dataset_path, n_households, state): + """Create simulation with all households assigned to a specific state.""" + s = Microsimulation(dataset=dataset_path) + s.set_input( + "state_fips", 2023, np.full(n_households, state, dtype=np.int32) + ) + for var in get_calculated_variables(s): + s.delete_arrays(var) + return s diff --git a/policyengine_us_data/tests/test_local_area_calibration/test_column_indexing.py b/policyengine_us_data/tests/test_local_area_calibration/test_column_indexing.py new file mode 100644 index 00000000..2e23763b --- /dev/null +++ b/policyengine_us_data/tests/test_local_area_calibration/test_column_indexing.py @@ -0,0 +1,47 @@ +"""Test column indexing in sparse matrix.""" + +import pytest + + +def test_column_indexing_roundtrip(X_sparse, tracer, test_cds): + """ + Verify column index = cd_idx * n_households + household_index. + + This is pure math - if this fails, everything else is unreliable. + """ + n_hh = tracer.n_households + hh_ids = tracer.original_household_ids + errors = [] + + test_cases = [] + for cd_idx in [0, len(test_cds) // 2, len(test_cds) - 1]: + for hh_idx in [0, 100, n_hh - 1]: + test_cases.append((cd_idx, hh_idx)) + + for cd_idx, hh_idx in test_cases: + cd = test_cds[cd_idx] + hh_id = hh_ids[hh_idx] + expected_col = cd_idx * n_hh + hh_idx + col_info = tracer.get_column_info(expected_col) + positions = tracer.get_household_column_positions(hh_id) + pos_col = positions[cd] + + if col_info["cd_geoid"] != cd: + errors.append(f"CD mismatch at col {expected_col}") + if col_info["household_index"] != hh_idx: + errors.append(f"HH index mismatch at col {expected_col}") + if col_info["household_id"] != hh_id: + errors.append(f"HH ID mismatch at col {expected_col}") + if pos_col != expected_col: + errors.append(f"Position mismatch for hh {hh_id}, cd {cd}") + + assert not errors, f"Column indexing errors: {errors}" + + +def test_matrix_dimensions(X_sparse, tracer, test_cds): + """Verify matrix width matches expected CD x household count.""" + n_hh = tracer.n_households + expected_cols = len(test_cds) * n_hh + assert ( + X_sparse.shape[1] == expected_cols + ), f"Matrix width mismatch: expected {expected_cols}, got {X_sparse.shape[1]}" diff --git a/policyengine_us_data/tests/test_local_area_calibration/test_cross_state.py b/policyengine_us_data/tests/test_local_area_calibration/test_cross_state.py new file mode 100644 index 00000000..ea9eca6f --- /dev/null +++ b/policyengine_us_data/tests/test_local_area_calibration/test_cross_state.py @@ -0,0 +1,101 @@ +"""Test cross-state values match state-swapped simulations.""" + +import pytest +import numpy as np + +from policyengine_us import Microsimulation +from policyengine_us_data.datasets.cps.local_area_calibration.calibration_utils import ( + get_calculated_variables, +) + + +def test_cross_state_matches_swapped_sim( + X_sparse, + targets_df, + tracer, + test_cds, + dataset_path, + n_households, + household_ids, + household_states, +): + """ + Cross-state non-zero cells must match state-swapped simulation. + + When household moves to different state, X_sparse should contain the + value calculated from a fresh simulation with state_fips set to + destination state. + """ + n_samples = 200 + seed = 42 + rng = np.random.default_rng(seed) + n_hh = n_households + hh_ids = household_ids + hh_states = household_states + + state_sims = {} + + def get_state_sim(state): + if state not in state_sims: + s = Microsimulation(dataset=dataset_path) + s.set_input( + "state_fips", 2023, np.full(n_hh, state, dtype=np.int32) + ) + for var in get_calculated_variables(s): + s.delete_arrays(var) + state_sims[state] = s + return state_sims[state] + + nonzero_rows, nonzero_cols = X_sparse.nonzero() + + cross_state_indices = [] + for i in range(len(nonzero_rows)): + col_idx = nonzero_cols[i] + cd_idx = col_idx // n_hh + hh_idx = col_idx % n_hh + cd = test_cds[cd_idx] + dest_state = int(cd) // 100 + orig_state = int(hh_states[hh_idx]) + if dest_state != orig_state: + cross_state_indices.append(i) + + if not cross_state_indices: + pytest.skip("No cross-state non-zero cells found") + + sample_idx = rng.choice( + cross_state_indices, + min(n_samples, len(cross_state_indices)), + replace=False, + ) + errors = [] + + for idx in sample_idx: + row_idx = nonzero_rows[idx] + col_idx = nonzero_cols[idx] + cd_idx = col_idx // n_hh + hh_idx = col_idx % n_hh + cd = test_cds[cd_idx] + dest_state = int(cd) // 100 + variable = targets_df.iloc[row_idx]["variable"] + actual = float(X_sparse[row_idx, col_idx]) + state_sim = get_state_sim(dest_state) + expected = float( + state_sim.calculate(variable, map_to="household").values[hh_idx] + ) + + if not np.isclose(actual, expected, atol=0.5): + errors.append( + { + "hh_id": hh_ids[hh_idx], + "orig_state": int(hh_states[hh_idx]), + "dest_state": dest_state, + "variable": variable, + "actual": actual, + "expected": expected, + } + ) + + assert not errors, ( + f"Cross-state verification failed: {len(errors)}/{len(sample_idx)} " + f"mismatches. First 5: {errors[:5]}" + ) diff --git a/policyengine_us_data/tests/test_local_area_calibration/test_geo_masking.py b/policyengine_us_data/tests/test_local_area_calibration/test_geo_masking.py new file mode 100644 index 00000000..b086efbf --- /dev/null +++ b/policyengine_us_data/tests/test_local_area_calibration/test_geo_masking.py @@ -0,0 +1,199 @@ +"""Test geographic masking behavior in sparse matrix.""" + +import pytest +import numpy as np + + +def test_state_level_zero_masking( + X_sparse, targets_df, tracer, test_cds, n_households +): + """ + State-level targets have zeros for wrong-state CD columns. + + For a target with geographic_id=37 (NC), columns for CDs in other states + (HI, MT, AK) should all be zero. + """ + seed = 42 + rng = np.random.default_rng(seed) + n_hh = n_households + + state_targets = [] + for row_idx in range(len(targets_df)): + geo_id = targets_df.iloc[row_idx].get("geographic_id", "US") + if geo_id != "US": + try: + val = int(geo_id) + if val < 100: + state_targets.append((row_idx, val)) + except (ValueError, TypeError): + pass + + if not state_targets: + pytest.skip("No state-level targets found") + + errors = [] + checked = 0 + sample_targets = rng.choice( + len(state_targets), min(20, len(state_targets)), replace=False + ) + + for idx in sample_targets: + row_idx, target_state = state_targets[idx] + other_state_cds = [ + (i, cd) + for i, cd in enumerate(test_cds) + if int(cd) // 100 != target_state + ] + if not other_state_cds: + continue + + sample_cds = rng.choice( + len(other_state_cds), min(5, len(other_state_cds)), replace=False + ) + for cd_sample_idx in sample_cds: + cd_idx, cd = other_state_cds[cd_sample_idx] + sample_hh = rng.choice(n_hh, min(5, n_hh), replace=False) + for hh_idx in sample_hh: + col_idx = cd_idx * n_hh + hh_idx + actual = X_sparse[row_idx, col_idx] + checked += 1 + if actual != 0: + errors.append( + {"row": row_idx, "cd": cd, "value": float(actual)} + ) + + assert ( + not errors + ), f"State-level masking failed: {len(errors)}/{checked} should be zero" + + +def test_cd_level_zero_masking( + X_sparse, targets_df, tracer, test_cds, n_households +): + """ + CD-level targets have zeros for other CDs, even same-state. + + For a target with geographic_id=3707, columns for CDs 3701-3706, 3708-3714 + should all be zero, even though they're all in NC (state 37). + """ + seed = 42 + rng = np.random.default_rng(seed) + n_hh = n_households + + cd_targets_with_same_state = [] + for row_idx in range(len(targets_df)): + geo_id = targets_df.iloc[row_idx].get("geographic_id", "US") + if geo_id != "US": + try: + val = int(geo_id) + if val >= 100: + target_state = val // 100 + same_state_other_cds = [ + cd + for cd in test_cds + if int(cd) // 100 == target_state and cd != geo_id + ] + if same_state_other_cds: + cd_targets_with_same_state.append( + (row_idx, geo_id, same_state_other_cds) + ) + except (ValueError, TypeError): + pass + + if not cd_targets_with_same_state: + pytest.skip( + "No CD-level targets with same-state other CDs in test_cds" + ) + + errors = [] + same_state_checks = 0 + + for row_idx, target_cd, other_cds in cd_targets_with_same_state[:10]: + for cd in other_cds: + cd_idx = test_cds.index(cd) + for hh_idx in rng.choice(n_hh, 3, replace=False): + col_idx = cd_idx * n_hh + hh_idx + actual = X_sparse[row_idx, col_idx] + same_state_checks += 1 + if actual != 0: + errors.append( + { + "target_cd": target_cd, + "other_cd": cd, + "value": float(actual), + } + ) + + assert not errors, ( + f"CD-level masking failed: {len(errors)} same-state-different-CD " + f"non-zero values. First 5: {errors[:5]}" + ) + + +def test_national_no_geo_masking( + X_sparse, targets_df, tracer, sim, test_cds, dataset_path, n_households +): + """ + National targets have no geographic masking. + + National targets (geographic_id='US') can have non-zero values for ANY CD. + Values differ by destination state because benefits are recalculated + under each state's rules. + """ + seed = 42 + rng = np.random.default_rng(seed) + n_hh = n_households + hh_ids = tracer.original_household_ids + + national_rows = [ + i + for i in range(len(targets_df)) + if targets_df.iloc[i].get("geographic_id", "US") == "US" + ] + + if not national_rows: + pytest.skip("No national targets found") + + states_in_test = sorted(set(int(cd) // 100 for cd in test_cds)) + cds_by_state = { + state: [cd for cd in test_cds if int(cd) // 100 == state] + for state in states_in_test + } + + for row_idx in national_rows: + variable = targets_df.iloc[row_idx]["variable"] + + row_data = X_sparse.getrow(row_idx) + nonzero_cols = row_data.nonzero()[1] + + assert ( + len(nonzero_cols) > 0 + ), f"National target row {row_idx} ({variable}) has no non-zero values" + + sample_cols = rng.choice( + nonzero_cols, min(5, len(nonzero_cols)), replace=False + ) + + households_checked = 0 + households_with_multi_state_values = 0 + + for col_idx in sample_cols: + hh_idx = col_idx % n_hh + + values_by_state = {} + for state, cds in cds_by_state.items(): + cd = cds[0] + cd_idx = test_cds.index(cd) + state_col = cd_idx * n_hh + hh_idx + val = float(X_sparse[row_idx, state_col]) + if val != 0: + values_by_state[state] = val + + households_checked += 1 + if len(values_by_state) > 1: + households_with_multi_state_values += 1 + + assert households_with_multi_state_values > 0, ( + f"National target {variable}: no households have values in " + f"multiple states" + ) diff --git a/policyengine_us_data/tests/test_local_area_calibration/test_same_state.py b/policyengine_us_data/tests/test_local_area_calibration/test_same_state.py new file mode 100644 index 00000000..a13f459d --- /dev/null +++ b/policyengine_us_data/tests/test_local_area_calibration/test_same_state.py @@ -0,0 +1,99 @@ +"""Test same-state values match fresh simulations.""" + +import pytest +import numpy as np + +from policyengine_us import Microsimulation +from policyengine_us_data.datasets.cps.local_area_calibration.calibration_utils import ( + get_calculated_variables, +) + + +def test_same_state_matches_original( + X_sparse, + targets_df, + tracer, + sim, + test_cds, + dataset_path, + n_households, + household_ids, + household_states, +): + """ + Same-state non-zero cells must match fresh same-state simulation. + + When household stays in same state, X_sparse should contain the value + calculated from a fresh simulation with state_fips set to that state. + """ + n_samples = 200 + seed = 42 + rng = np.random.default_rng(seed) + n_hh = n_households + hh_ids = household_ids + hh_states = household_states + + state_sims = {} + + def get_state_sim(state): + if state not in state_sims: + s = Microsimulation(dataset=dataset_path) + s.set_input( + "state_fips", 2023, np.full(n_hh, state, dtype=np.int32) + ) + for var in get_calculated_variables(s): + s.delete_arrays(var) + state_sims[state] = s + return state_sims[state] + + nonzero_rows, nonzero_cols = X_sparse.nonzero() + + same_state_indices = [] + for i in range(len(nonzero_rows)): + col_idx = nonzero_cols[i] + cd_idx = col_idx // n_hh + hh_idx = col_idx % n_hh + cd = test_cds[cd_idx] + dest_state = int(cd) // 100 + orig_state = int(hh_states[hh_idx]) + if dest_state == orig_state: + same_state_indices.append(i) + + if not same_state_indices: + pytest.skip("No same-state non-zero cells found") + + sample_idx = rng.choice( + same_state_indices, + min(n_samples, len(same_state_indices)), + replace=False, + ) + errors = [] + + for idx in sample_idx: + row_idx = nonzero_rows[idx] + col_idx = nonzero_cols[idx] + cd_idx = col_idx // n_hh + hh_idx = col_idx % n_hh + cd = test_cds[cd_idx] + dest_state = int(cd) // 100 + variable = targets_df.iloc[row_idx]["variable"] + actual = float(X_sparse[row_idx, col_idx]) + state_sim = get_state_sim(dest_state) + expected = float( + state_sim.calculate(variable, map_to="household").values[hh_idx] + ) + + if not np.isclose(actual, expected, atol=0.5): + errors.append( + { + "hh_id": hh_ids[hh_idx], + "variable": variable, + "actual": actual, + "expected": expected, + } + ) + + assert not errors, ( + f"Same-state verification failed: {len(errors)}/{len(sample_idx)} " + f"mismatches. First 5: {errors[:5]}" + ) diff --git a/pyproject.toml b/pyproject.toml index 3d00d389..095c5099 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,7 +52,7 @@ dev = [ "tabulate", "furo", "jupyter-book", - "mystmd", + "mystmd>=1.7.0", "yaml-changelog>=0.1.7", "build", "tomli",