Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions .github/workflows/local_area_promote.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
name: Promote Local Area H5 Files

on:
workflow_dispatch:
inputs:
version:
description: 'Version to promote (e.g. 1.23.0)'
required: true
type: string
branch:
description: 'Branch to use for repo setup'
required: false
default: 'main'
type: string

jobs:
promote-local-area:
runs-on: ubuntu-latest
permissions:
contents: read
env:
HUGGING_FACE_TOKEN: ${{ secrets.HUGGING_FACE_TOKEN }}
MODAL_TOKEN_ID: ${{ secrets.MODAL_TOKEN_ID }}
MODAL_TOKEN_SECRET: ${{ secrets.MODAL_TOKEN_SECRET }}

steps:
- name: Checkout repo
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.13'

- name: Install Modal CLI
run: pip install modal

- name: Promote staged files to production
run: |
VERSION="${{ github.event.inputs.version }}"
BRANCH="${{ github.event.inputs.branch }}"
echo "Promoting version ${VERSION} from branch ${BRANCH}"
modal run modal_app/local_area.py::main_promote --version="${VERSION}" --branch="${BRANCH}"
12 changes: 11 additions & 1 deletion .github/workflows/local_area_publish.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ jobs:
- name: Install Modal CLI
run: pip install modal

- name: Run local area publishing on Modal
- name: Run local area build and stage on Modal
run: |
NUM_WORKERS="${{ github.event.inputs.num_workers || '8' }}"
SKIP_UPLOAD="${{ github.event.inputs.skip_upload || 'false' }}"
Expand All @@ -63,3 +63,13 @@ jobs:

echo "Running: $CMD"
$CMD

- name: Post-build summary
if: success()
run: |
echo "## Build + Stage Complete" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "Files have been uploaded to GCS and staged on HuggingFace." >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "### Next step: Promote to production" >> $GITHUB_STEP_SUMMARY
echo "Trigger the **Promote Local Area H5 Files** workflow with the version from the build output." >> $GITHUB_STEP_SUMMARY
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ data: download
python policyengine_us_data/datasets/cps/extended_cps.py
python policyengine_us_data/datasets/cps/enhanced_cps.py
python policyengine_us_data/datasets/cps/small_enhanced_cps.py
python policyengine_us_data/datasets/cps/local_area_calibration/create_stratified_cps.py 10500
python policyengine_us_data/datasets/cps/local_area_calibration/create_stratified_cps.py 12000 --top=99.5 --seed=3526

publish-local-area:
python policyengine_us_data/datasets/cps/local_area_calibration/publish_local_area.py
Expand Down
6 changes: 6 additions & 0 deletions changelog_entry.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
- date: 2026-02-02
type: fixed
description: Fix stale calibration targets by deriving time_period from dataset across all ETL scripts, using income_tax_positive for CBO calibration, and adding 119th Congress district code support for consistent redistricting alignment
- date: 2026-02-07
type: added
description: Add ACA Premium Tax Credit targets from IRS SOI data (cherry-picked from PR #508)
114 changes: 97 additions & 17 deletions modal_app/local_area.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,14 +259,11 @@ def validate_staging(branch: str, version: str) -> Dict:
memory=8192,
timeout=14400,
)
def atomic_upload(branch: str, version: str, manifest: Dict) -> str:
def upload_to_staging(branch: str, version: str, manifest: Dict) -> str:
"""
Upload files using staging approach for atomic deployment.
Upload files to GCS (production) and HuggingFace (staging only).

1. Upload to GCS (direct, overwrites existing)
2. Upload to HuggingFace staging/ folder
3. Atomically promote staging/ to production paths
4. Clean up staging/
Promote must be run separately via promote_publish.
"""
setup_gcp_credentials()
setup_repo(branch)
Expand All @@ -286,8 +283,6 @@ def atomic_upload(branch: str, version: str, manifest: Dict) -> str:
from policyengine_us_data.utils.data_upload import (
upload_local_area_file,
upload_to_staging_hf,
promote_staging_to_production_hf,
cleanup_staging_hf,
)

manifest = json.loads('''{manifest_json}''')
Expand All @@ -306,11 +301,9 @@ def atomic_upload(branch: str, version: str, manifest: Dict) -> str:
print(f"Verified {{verification['verified']}} files")

files_with_paths = []
rel_paths = []
for rel_path in manifest["files"].keys():
local_path = version_dir / rel_path
files_with_paths.append((local_path, rel_path))
rel_paths.append(rel_path)

# Upload to GCS (direct to production paths)
print(f"Uploading {{len(files_with_paths)}} files to GCS...")
Expand All @@ -331,12 +324,73 @@ def atomic_upload(branch: str, version: str, manifest: Dict) -> str:
hf_count = upload_to_staging_hf(files_with_paths, version)
print(f"Uploaded {{hf_count}} files to HuggingFace staging/")

# Atomically promote staging to production
print("Promoting staging/ to production (atomic commit)...")
print(f"Staged version {{version}} for promotion")
""",
],
text=True,
env=os.environ.copy(),
)

if result.returncode != 0:
raise RuntimeError(f"Upload failed: {result.stderr}")

return (
f"Staged version {version} with {len(manifest['files'])} files. "
f"Run promote workflow to publish to HuggingFace production."
)


@app.function(
image=image,
secrets=[hf_secret],
volumes={VOLUME_MOUNT: staging_volume},
memory=4096,
timeout=3600,
)
def promote_publish(branch: str = "main", version: str = "") -> str:
"""
Promote staged files from HF staging/ to production paths, then cleanup.

Reads the manifest from the Modal staging volume to determine which
files to promote.
"""
setup_repo(branch)

staging_dir = Path(VOLUME_MOUNT)
staging_volume.reload()

manifest_path = staging_dir / version / "manifest.json"
if not manifest_path.exists():
raise RuntimeError(
f"No manifest found at {manifest_path}. "
f"Run build+stage workflow first."
)

with open(manifest_path) as f:
manifest = json.load(f)

rel_paths_json = json.dumps(list(manifest["files"].keys()))

result = subprocess.run(
[
"uv",
"run",
"python",
"-c",
f"""
import json
from policyengine_us_data.utils.data_upload import (
promote_staging_to_production_hf,
cleanup_staging_hf,
)

rel_paths = json.loads('''{rel_paths_json}''')
version = "{version}"

print(f"Promoting {{len(rel_paths)}} files from staging/ to production...")
promoted = promote_staging_to_production_hf(rel_paths, version)
print(f"Promoted {{promoted}} files to production")

# Clean up staging
print("Cleaning up staging/...")
cleaned = cleanup_staging_hf(rel_paths, version)
print(f"Cleaned up {{cleaned}} files from staging/")
Expand All @@ -349,9 +403,9 @@ def atomic_upload(branch: str, version: str, manifest: Dict) -> str:
)

if result.returncode != 0:
raise RuntimeError(f"Upload failed: {result.stderr}")
raise RuntimeError(f"Promote failed: {result.stderr}")

return f"Successfully published version {version} with {len(manifest['files'])} files"
return f"Successfully promoted version {version} with {len(manifest['files'])} files"


@app.function(
Expand Down Expand Up @@ -544,10 +598,24 @@ def coordinate_publish(
f"WARNING: Expected {expected_total} files, found {actual_total}"
)

print("\nStarting atomic upload...")
result = atomic_upload.remote(
print("\nStarting upload to staging...")
result = upload_to_staging.remote(
branch=branch, version=version, manifest=manifest
)
print(result)

print("\n" + "=" * 60)
print("BUILD + STAGE COMPLETE")
print("=" * 60)
print(
f"To promote to HuggingFace production, run the "
f"'Promote Local Area H5 Files' workflow with version={version}"
)
print(
"Or run manually: modal run modal_app/local_area.py::main_promote "
f"--version={version}"
)
print("=" * 60)

return result

Expand All @@ -565,3 +633,15 @@ def main(
skip_upload=skip_upload,
)
print(result)


@app.local_entrypoint()
def main_promote(
version: str = "",
branch: str = "main",
):
"""Promote staged files to HuggingFace production."""
if not version:
raise ValueError("--version is required")
result = promote_publish.remote(branch=branch, version=version)
print(result)
2 changes: 1 addition & 1 deletion policyengine_us_data/datasets/cps/enhanced_cps.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def generate(self):
loss_matrix_clean,
targets_array_clean,
log_path="calibration_log.csv",
epochs=200,
epochs=250,
seed=1456,
)
data["household_weight"][year] = optimised_weights
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,39 +252,24 @@ def get_pseudo_input_variables(sim) -> set:
"""
Identify pseudo-input variables that should NOT be saved to H5 files.

A pseudo-input is a variable that:
- Appears in sim.input_variables (has stored values)
- Has 'adds' or 'subtracts' attribute
- At least one component has a formula (is calculated)

These variables have stale pre-computed values that corrupt calculations
when reloaded, because the stored value overrides the formula.
NOTE: This function currently returns an empty set. The original logic
excluded variables with 'adds' or 'subtracts' attributes, but analysis
showed that in CPS data, these variables contain authoritative stored
data that does NOT match their component variables:

- pre_tax_contributions: components are all 0, aggregate has imputed values
- tax_exempt_pension_income: aggregate has 135M, components only 20M
- taxable_pension_income: aggregate has 82M, components only 29M
- interest_deduction: aggregate has 41M, components are 0

The 'adds' attribute defines how to CALCULATE these values, but in CPS
data the stored values are the authoritative source. Excluding them and
recalculating from components produces incorrect results.

For geo-stacking, entity ID reindexing preserves within-entity
relationships, so aggregation within a person or tax_unit remains valid.
"""
tbs = sim.tax_benefit_system
pseudo_inputs = set()

for var_name in sim.input_variables:
var = tbs.variables.get(var_name)
if not var:
continue

adds = getattr(var, "adds", None)
if adds and isinstance(adds, list):
for component in adds:
comp_var = tbs.variables.get(component)
if comp_var and len(getattr(comp_var, "formulas", {})) > 0:
pseudo_inputs.add(var_name)
break

subtracts = getattr(var, "subtracts", None)
if subtracts and isinstance(subtracts, list):
for component in subtracts:
comp_var = tbs.variables.get(component)
if comp_var and len(getattr(comp_var, "formulas", {})) > 0:
pseudo_inputs.add(var_name)
break

return pseudo_inputs
return set()


def apply_op(values: np.ndarray, op: str, val: str) -> np.ndarray:
Expand Down
2 changes: 1 addition & 1 deletion policyengine_us_data/db/DATABASE_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ The `stratum_group_id` field categorizes strata:
| 5 | Medicaid | Medicaid enrollment strata |
| 6 | EITC | EITC recipients by qualifying children |
| 7 | State Income Tax | State-level income tax collections (Census STC) |
| 100-118 | IRS Conditional | Each IRS variable paired with conditional count constraints |
| 100-119 | IRS Conditional | Each IRS variable paired with conditional count constraints (includes ACA PTC at 119) |

### Conditional Strata (IRS SOI)

Expand Down
28 changes: 26 additions & 2 deletions policyengine_us_data/db/create_initial_strata.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import argparse
import logging
from typing import Dict

Expand All @@ -6,6 +7,8 @@
from sqlmodel import Session, create_engine

from policyengine_us_data.storage import STORAGE_FOLDER

DEFAULT_DATASET = "hf://policyengine/policyengine-us-data/calibration/stratified_extended_cps.h5"
from policyengine_us_data.db.create_database_tables import (
Stratum,
StratumConstraint,
Expand Down Expand Up @@ -68,6 +71,28 @@ def fetch_congressional_districts(year):


def main():
parser = argparse.ArgumentParser(
description="Create initial geographic strata for calibration"
)
parser.add_argument(
"--dataset",
default=DEFAULT_DATASET,
help=(
"Source dataset (local path or HuggingFace URL). "
"The year for Census API calls is derived from the dataset's "
"default_calculation_period. Default: %(default)s"
),
)
args = parser.parse_args()

# Derive year from dataset
from policyengine_us import Microsimulation

print(f"Loading dataset: {args.dataset}")
sim = Microsimulation(dataset=args.dataset)
year = int(sim.default_calculation_period)
print(f"Derived year from dataset: {year}")

# State FIPS to name/abbreviation mapping
STATE_NAMES = {
1: "Alabama (AL)",
Expand Down Expand Up @@ -123,8 +148,7 @@ def main():
56: "Wyoming (WY)",
}

# Fetch congressional district data for year 2023
year = 2023
# Fetch congressional district data
cd_df = fetch_congressional_districts(year)

DATABASE_URL = (
Expand Down
Loading
Loading