Skip to content
Merged
4 changes: 4 additions & 0 deletions changelog_entry.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
- bump: minor
changes:
added:
- County FIPS dataset
7 changes: 6 additions & 1 deletion policyengine_us_data/geography/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
from pathlib import Path
import pandas as pd
import os

ZIP_CODE_DATASET_PATH = (
Path(__file__).parent.parent / "geography" / "zip_codes.csv.gz"
)

ZIP_CODE_DATASET = pd.read_csv(ZIP_CODE_DATASET_PATH, compression="gzip")
# Avoid circular import error when -us-data is initialized
if os.path.exists(ZIP_CODE_DATASET_PATH):
ZIP_CODE_DATASET = pd.read_csv(ZIP_CODE_DATASET_PATH, compression="gzip")
else:
ZIP_CODE_DATASET = None
83 changes: 83 additions & 0 deletions policyengine_us_data/geography/county_fips.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import pandas as pd
import requests
from io import StringIO, BytesIO
from pathlib import Path
from policyengine_us_data.utils.huggingface import upload as upload_to_hf

LOCAL_FOLDER = Path(__file__).parent


def generate_county_fips_2020_dataset():
"""
One-off script to generate a dataset of county FIPS codes used in the 2020 Census.

Running this file will create the dataset and save it locally as county_fips.csv.gz.
"""
# More information about this data at https://www.census.gov/library/reference/code-lists/ansi.html#cou

# Dataset contains the following columns:
# STATE - 2-digit state postal code (e.g., "AL")
# STATEFP - State FIPS code (01 for AL)
# COUNTYFP - Three-digit county portion of FIPS (001 for Autauga County, AL, if STATEFP is 01)
# COUNTYNAME - County name

COUNTY_FIPS_2020_URL = "https://www2.census.gov/geo/docs/reference/codes2020/national_county2020.txt"

# Download the base tab-delimited data file
response = requests.get(COUNTY_FIPS_2020_URL)
if response.status_code != 200:
raise ValueError(
f"Failed to download county FIPS codes: {response.status_code}"
)

county_fips_raw = StringIO(response.text)

county_fips = pd.read_csv(
county_fips_raw,
delimiter="|",
usecols=["STATE", "STATEFP", "COUNTYFP", "COUNTYNAME"],
dtype={
"STATE": str,
"STATEFP": str,
"COUNTYFP": str,
"COUNTYNAME": str,
},
)

county_fips = county_fips.rename(
columns={
"STATE": "state",
"STATEFP": "state_fips_segment",
"COUNTYFP": "county_fips_segment",
"COUNTYNAME": "county_name",
}
)

# Create composite county FIPS code, then drop segment columns;
# note that the FIPS code is a 5-char str of digits
county_fips["county_fips"] = (
county_fips["state_fips_segment"] + county_fips["county_fips_segment"]
)
county_fips.drop(
columns=["state_fips_segment", "county_fips_segment"], inplace=True
)

# Create buffer to save CSV
csv_buffer = BytesIO()

# Save CSV into buffer object and reset pointer
county_fips.to_csv(csv_buffer, index=False, compression="gzip")
csv_buffer.seek(0)

# Upload to Hugging Face
upload_to_hf(
local_file_path=csv_buffer,
repo="policyengine/policyengine-us-data",
repo_file_path="county_fips_2020.csv.gz",
)

county_fips.to_csv(LOCAL_FOLDER / "county_fips.csv.gz", compression="gzip")


if __name__ == "__main__":
generate_county_fips_2020_dataset()
5 changes: 5 additions & 0 deletions policyengine_us_data/geography/create_zip_code_dataset.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import pandas as pd

# Note: This method of dataset creation (downloading a government file,
# processing it, and saving the processed data as an exported file) is
# deprecated in this package in favor of uploading datasets to HuggingFace,
# then having country packages download as needed.

# Per-ZCTA population dataset
# ACS 5-year estimates, download URL: https://data.census.gov/cedsci/table?q=DP05%3A%20ACS%20DEMOGRAPHIC%20AND%20HOUSING%20ESTIMATES&g=0100000US%248600000&tid=ACSDP5Y2020.DP05

Expand Down
190 changes: 190 additions & 0 deletions policyengine_us_data/tests/test_datasets/test_county_fips.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
import pytest
import pandas as pd
from unittest.mock import patch, MagicMock, mock_open
from io import StringIO, BytesIO
from pathlib import Path

# Import the function to test
from policyengine_us_data.geography.county_fips import (
generate_county_fips_2020_dataset,
LOCAL_FOLDER,
)


# Sample data that mimics the format from census.gov
SAMPLE_CENSUS_DATA = """STATE|STATEFP|COUNTYFP|COUNTYNAME
AL|01|001|Autauga County
AL|01|003|Baldwin County
NY|36|001|Albany County
NY|36|003|Bronx County
"""


@pytest.fixture
def mock_response():
"""Create a mock response object that mimics a successful requests.get"""
mock_resp = MagicMock()
mock_resp.status_code = 200
mock_resp.text = SAMPLE_CENSUS_DATA
return mock_resp


@pytest.fixture
def mock_requests_get(mock_response):
"""Mock requests.get to return our mock response"""
with patch("requests.get", return_value=mock_response) as mock_get:
yield mock_get


@pytest.fixture
def mock_upload_to_hf():
"""Mock the upload_to_hf function"""
with patch(
"policyengine_us_data.geography.county_fips.upload_to_hf"
) as mock_upload:
yield mock_upload


@pytest.fixture
def mock_local_folder():
"""Mock the LOCAL_FOLDER"""
mock_path = MagicMock()
with patch(
"policyengine_us_data.geography.county_fips.LOCAL_FOLDER", mock_path
):
yield mock_path


@pytest.fixture
def mock_to_csv():
"""Mock pandas DataFrame.to_csv"""
with patch("pandas.DataFrame.to_csv") as mock_csv:
yield mock_csv


@pytest.fixture
def expected_dataframe():
"""Create the expected dataframe after processing"""
data = {
"state": ["AL", "AL", "NY", "NY"],
"county_name": [
"Autauga County",
"Baldwin County",
"Albany County",
"Bronx County",
],
"county_fips": ["01001", "01003", "36001", "36003"],
}
return pd.DataFrame(data)


def test_successful_download_and_processing(
mock_response,
mock_upload_to_hf,
mock_to_csv,
mock_requests_get,
expected_dataframe,
):
"""Test the entire function with a successful download and processing"""

# Run the function
generate_county_fips_2020_dataset()

# Check that upload_to_hf was called
mock_upload_to_hf.assert_called_once()

# Check that to_csv was called with the right path
local_csv_call = mock_to_csv.call_args_list[-1]
assert str(LOCAL_FOLDER / "county_fips.csv.gz") in str(local_csv_call)


def test_download_failure():
"""Test handling of download failure"""

# Create a mock response with error status code
failed_response = MagicMock()
failed_response.status_code = 404

with (
patch("requests.get", return_value=failed_response),
pytest.raises(ValueError) as excinfo,
):

# Run the function, expect ValueError
generate_county_fips_2020_dataset()

# Check error message contains status code
assert "404" in str(excinfo.value)


def test_dataframe_transformation(
mock_response,
mock_requests_get,
mock_upload_to_hf,
mock_to_csv,
expected_dataframe,
):
"""Test the transformation of the raw data into the expected dataframe"""

# Create a way to capture the dataframe before it's uploaded
original_to_csv = pd.DataFrame.to_csv

def capture_df(self, *args, **kwargs):
# Store the dataframe for inspection
capture_df.result_df = self.copy()
return original_to_csv(self, *args, **kwargs)

with patch("pandas.DataFrame.to_csv", capture_df):
generate_county_fips_2020_dataset()

# Get the captured dataframe
result_df = capture_df.result_df

# Check columns
assert list(result_df.columns) == list(expected_dataframe.columns)

# Check data content
for col in result_df.columns:
assert result_df[col].tolist() == expected_dataframe[col].tolist()

# Ensure FIPS codes are correctly formatted (5 digits)
assert all(len(fips) == 5 for fips in result_df["county_fips"])


def test_output_file_creation(
mock_upload_to_hf, mock_to_csv, mock_requests_get, mock_local_folder
):
"""Test that the local output file is created correctly"""

mock_resp = MagicMock()
mock_resp.status_code = 200
mock_resp.text = SAMPLE_CENSUS_DATA

# Create a mock path object
mock_path = MagicMock()

generate_county_fips_2020_dataset()

# Check that to_csv was called with gzip compression
kwargs = mock_to_csv.call_args_list[-1][1] # Get kwargs of the last call
assert kwargs.get("compression") == "gzip"


def test_huggingface_upload(mock_upload_to_hf, mock_to_csv, mock_requests_get):
"""Test that the upload to Hugging Face is called with the correct parameters"""

mock_resp = MagicMock()
mock_resp.status_code = 200
mock_resp.text = SAMPLE_CENSUS_DATA

generate_county_fips_2020_dataset()

# Check that upload_to_hf was called with the correct repo and file path
call_kwargs = mock_upload_to_hf.call_args[1]
assert call_kwargs["repo"] == "policyengine/policyengine-us-data"
assert call_kwargs["repo_file_path"] == "county_fips_2020.csv.gz"

# Verify that the first parameter is a BytesIO instance
assert isinstance(
mock_upload_to_hf.call_args[1]["local_file_path"], BytesIO
)
Loading