Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
.PHONY: all format test install download upload docker documentation data publish-local-area clean build paper clean-paper presentations database database-refresh promote-database
.PHONY: all format test install download upload docker documentation data publish-local-area clean build paper clean-paper presentations database database-refresh promote-database promote-dataset

HF_CLONE_DIR = $(HOME)/devl/huggingface/policyengine-us-data

all: data test

Expand Down Expand Up @@ -72,12 +74,17 @@ database-refresh:

promote-database:
cp policyengine_us_data/storage/calibration/policy_data.db \
$(HOME)/devl/huggingface/policyengine-us-data/calibration/policy_data.db
rm -rf $(HOME)/devl/huggingface/policyengine-us-data/calibration/raw_inputs
$(HF_CLONE_DIR)/calibration/policy_data.db
rm -rf $(HF_CLONE_DIR)/calibration/raw_inputs
cp -r policyengine_us_data/storage/calibration/raw_inputs \
$(HOME)/devl/huggingface/policyengine-us-data/calibration/raw_inputs
$(HF_CLONE_DIR)/calibration/raw_inputs
@echo "Copied DB and raw_inputs to HF clone. Now cd to HF repo, commit, and push."

promote-dataset:
cp policyengine_us_data/storage/stratified_extended_cps_2024.h5 \
$(HF_CLONE_DIR)/calibration/stratified_extended_cps.h5
@echo "Copied dataset to HF clone. Now cd to HF repo, commit, and push."

data: download
python policyengine_us_data/utils/uprating.py
python policyengine_us_data/datasets/acs/acs.py
Expand All @@ -87,6 +94,10 @@ data: download
python policyengine_us_data/datasets/cps/extended_cps.py
python policyengine_us_data/datasets/cps/enhanced_cps.py
python policyengine_us_data/datasets/cps/small_enhanced_cps.py
# 12000: number of households our GPUs can handle (found via trial and error).
# --top=99.5: include only top 0.5% (vs default 1%) to preserve
# representation of lower-income households.
# --seed=3526: reproducible stratified sampling.
python policyengine_us_data/datasets/cps/local_area_calibration/create_stratified_cps.py 12000 --top=99.5 --seed=3526

publish-local-area:
Expand Down
10 changes: 10 additions & 0 deletions changelog_entry.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
- bump: patch
changes:
changed:
- Switch DEFAULT_DATASET to local storage path for database ETL scripts
- Extract shared etl_argparser() to reduce boilerplate across 7 ETL scripts
- Delete dead get_pseudo_input_variables() function
added:
- promote-dataset Makefile target
- Year-mismatch warning in national targets ETL
- Congress-session constants and warning in SOI district puller
Original file line number Diff line number Diff line change
Expand Up @@ -248,30 +248,6 @@ def get_calculated_variables(sim) -> List[str]:
return result


def get_pseudo_input_variables(sim) -> set:
"""
Identify pseudo-input variables that should NOT be saved to H5 files.

NOTE: This function currently returns an empty set. The original logic
excluded variables with 'adds' or 'subtracts' attributes, but analysis
showed that in CPS data, these variables contain authoritative stored
data that does NOT match their component variables:

- pre_tax_contributions: components are all 0, aggregate has imputed values
- tax_exempt_pension_income: aggregate has 135M, components only 20M
- taxable_pension_income: aggregate has 82M, components only 29M
- interest_deduction: aggregate has 41M, components are 0

The 'adds' attribute defines how to CALCULATE these values, but in CPS
data the stored values are the authoritative source. Excluding them and
recalculating from components produces incorrect results.

For geo-stacking, entity ID reindexing preserves within-entity
relationships, so aggregation within a person or tax_unit remains valid.
"""
return set()


def apply_op(values: np.ndarray, op: str, val: str) -> np.ndarray:
"""Apply constraint operation to values array."""
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@
from policyengine_us import Microsimulation
from policyengine_core.data.dataset import Dataset
from policyengine_core.enums import Enum
from policyengine_us_data.datasets.cps.local_area_calibration.calibration_utils import (
get_pseudo_input_variables,
)


def create_stratified_cps_dataset(
Expand Down Expand Up @@ -225,16 +222,6 @@ def create_stratified_cps_dataset(

# Only save input variables (not calculated/derived variables)
input_vars = set(sim.input_variables)

# Filter out pseudo-inputs: variables with adds/subtracts that aggregate
# formula-based components. These have stale values that corrupt calculations.
pseudo_inputs = get_pseudo_input_variables(sim)
if pseudo_inputs:
print(f"Excluding {len(pseudo_inputs)} pseudo-input variables:")
for var in sorted(pseudo_inputs):
print(f" - {var}")
input_vars = input_vars - pseudo_inputs

print(f"Found {len(input_vars)} input variables to save")

for variable in stratified_sim.tax_benefit_system.variables:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from policyengine_us_data.datasets.cps.local_area_calibration.calibration_utils import (
get_all_cds_from_database,
get_calculated_variables,
get_pseudo_input_variables,
STATE_CODES,
STATE_FIPS_TO_NAME,
STATE_FIPS_TO_CODE,
Expand Down Expand Up @@ -624,17 +623,7 @@ def create_sparse_cd_stacked_dataset(

# Only save input variables (not calculated/derived variables)
# Calculated variables like state_name, state_code will be recalculated on load
input_vars = set(base_sim.input_variables)

# Filter out pseudo-inputs: variables with adds/subtracts that aggregate
# formula-based components. These have stale values that corrupt calculations.
pseudo_inputs = get_pseudo_input_variables(base_sim)
if pseudo_inputs:
print(f"Excluding {len(pseudo_inputs)} pseudo-input variables:")
for var in sorted(pseudo_inputs):
print(f" - {var}")

vars_to_save = input_vars - pseudo_inputs
vars_to_save = set(base_sim.input_variables)
print(f"Found {len(vars_to_save)} input variables to save")

# congressional_district_geoid isn't in the original microdata and has no formula,
Expand Down
26 changes: 2 additions & 24 deletions policyengine_us_data/db/create_initial_strata.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import argparse
import logging
from typing import Dict

Expand All @@ -7,12 +6,11 @@
from sqlmodel import Session, create_engine

from policyengine_us_data.storage import STORAGE_FOLDER

DEFAULT_DATASET = "hf://policyengine/policyengine-us-data/calibration/stratified_extended_cps.h5"
from policyengine_us_data.db.create_database_tables import (
Stratum,
StratumConstraint,
)
from policyengine_us_data.utils.db import etl_argparser
from policyengine_us_data.utils.raw_cache import (
is_cached,
save_json,
Expand Down Expand Up @@ -71,27 +69,7 @@ def fetch_congressional_districts(year):


def main():
parser = argparse.ArgumentParser(
description="Create initial geographic strata for calibration"
)
parser.add_argument(
"--dataset",
default=DEFAULT_DATASET,
help=(
"Source dataset (local path or HuggingFace URL). "
"The year for Census API calls is derived from the dataset's "
"default_calculation_period. Default: %(default)s"
),
)
args = parser.parse_args()

# Derive year from dataset
from policyengine_us import Microsimulation

print(f"Loading dataset: {args.dataset}")
sim = Microsimulation(dataset=args.dataset)
year = int(sim.default_calculation_period)
print(f"Derived year from dataset: {year}")
_, year = etl_argparser("Create initial geographic strata for calibration")

# State FIPS to name/abbreviation mapping
STATE_NAMES = {
Expand Down
33 changes: 6 additions & 27 deletions policyengine_us_data/db/etl_age.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
import argparse

import pandas as pd
import numpy as np
from sqlmodel import Session, create_engine, select

from policyengine_us_data.storage import STORAGE_FOLDER

DEFAULT_DATASET = "hf://policyengine/policyengine-us-data/calibration/stratified_extended_cps.h5"

from policyengine_us_data.db.create_database_tables import (
Stratum,
StratumConstraint,
Target,
SourceType,
)
from policyengine_us_data.utils.census import get_census_docs, pull_acs_table
from policyengine_us_data.utils.db import parse_ucgid, get_geographic_strata
from policyengine_us_data.utils.db import (
parse_ucgid,
get_geographic_strata,
etl_argparser,
)
from policyengine_us_data.utils.db_metadata import (
get_or_create_source,
get_or_create_variable_group,
Expand Down Expand Up @@ -287,27 +286,7 @@ def load_age_data(df_long, geo, year):


def main():
parser = argparse.ArgumentParser(
description="ETL for age calibration targets"
)
parser.add_argument(
"--dataset",
default=DEFAULT_DATASET,
help=(
"Source dataset (local path or HuggingFace URL). "
"The year for Census API calls is derived from the dataset's "
"default_calculation_period. Default: %(default)s"
),
)
args = parser.parse_args()

# Derive year from dataset
from policyengine_us import Microsimulation

print(f"Loading dataset: {args.dataset}")
sim = Microsimulation(dataset=args.dataset)
year = int(sim.default_calculation_period)
print(f"Derived year from dataset: {year}")
_, year = etl_argparser("ETL for age calibration targets")

# --- ETL: Extract, Transform, Load ----

Expand Down
73 changes: 27 additions & 46 deletions policyengine_us_data/db/etl_irs_soi.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import argparse
import logging
from typing import Optional

Expand All @@ -8,19 +7,6 @@
from sqlmodel import Session, create_engine, select

from policyengine_us_data.storage import STORAGE_FOLDER

DEFAULT_DATASET = "hf://policyengine/policyengine-us-data/calibration/stratified_extended_cps.h5"

# IRS SOI data is typically available ~2 years after the tax year
IRS_SOI_LAG_YEARS = 2
from policyengine_us_data.utils.raw_cache import (
is_cached,
cache_path,
save_bytes,
)

logger = logging.getLogger(__name__)

from policyengine_us_data.db.create_database_tables import (
Stratum,
StratumConstraint,
Expand All @@ -34,6 +20,7 @@
get_stratum_parent,
parse_ucgid,
get_geographic_strata,
etl_argparser,
)
from policyengine_us_data.utils.db_metadata import (
get_or_create_source,
Expand All @@ -44,6 +31,17 @@
from policyengine_us_data.storage.calibration_targets.make_district_mapping import (
get_district_mapping,
)
from policyengine_us_data.utils.raw_cache import (
is_cached,
cache_path,
save_bytes,
)

logger = logging.getLogger(__name__)


# IRS SOI data is typically available ~2 years after the tax year
IRS_SOI_LAG_YEARS = 2

"""See the 22incddocguide.docx manual from the IRS SOI"""
# Language in the doc: '$10,000 under $25,000' means >= $10,000 and < $25,000
Expand Down Expand Up @@ -1236,40 +1234,23 @@ def load_soi_data(long_dfs, year):


def main():
parser = argparse.ArgumentParser(
description="ETL for IRS SOI calibration targets"
)
parser.add_argument(
"--dataset",
default=DEFAULT_DATASET,
help=(
"Source dataset (local path or HuggingFace URL). "
"The year for IRS SOI data is derived from the dataset's "
"default_calculation_period minus IRS_SOI_LAG_YEARS. "
"Default: %(default)s"
),
)
parser.add_argument(
"--lag",
type=int,
default=IRS_SOI_LAG_YEARS,
help=(
"Years to subtract from dataset year for IRS SOI data "
"(default: %(default)s, since IRS data is ~2 years behind)"
),
)
args = parser.parse_args()

# Derive year from dataset with lag applied
from policyengine_us import Microsimulation
def add_lag_arg(parser):
parser.add_argument(
"--lag",
type=int,
default=IRS_SOI_LAG_YEARS,
help=(
"Years to subtract from dataset year for IRS SOI data "
"(default: %(default)s, since IRS data is ~2 years behind)"
),
)

print(f"Loading dataset: {args.dataset}")
sim = Microsimulation(dataset=args.dataset)
dataset_year = int(sim.default_calculation_period)
year = dataset_year - args.lag
print(
f"Dataset year: {dataset_year}, IRS SOI year: {year} (lag={args.lag})"
args, dataset_year = etl_argparser(
"ETL for IRS SOI calibration targets",
extra_args_fn=add_lag_arg,
)
year = dataset_year - args.lag
print(f"IRS SOI year: {year} (lag={args.lag})")

# Extract -----------------------
raw_df = extract_soi_data()
Expand Down
Loading