From e8835953949f5f1fc149b74e26247e0390662001 Mon Sep 17 00:00:00 2001 From: Griffin Sharps Date: Thu, 11 Dec 2025 03:07:24 +0000 Subject: [PATCH] Updated the units of the regression analysis to be individual household x day observations rather than households --- .../stage2_blockgroup_regression.py | 409 +++++++++++------- 1 file changed, 249 insertions(+), 160 deletions(-) diff --git a/analysis/clustering/stage2_blockgroup_regression.py b/analysis/clustering/stage2_blockgroup_regression.py index ec64e7b..5a4df60 100644 --- a/analysis/clustering/stage2_blockgroup_regression.py +++ b/analysis/clustering/stage2_blockgroup_regression.py @@ -5,18 +5,22 @@ Goal ----- Model how Census block-group demographics are associated with the composition -of households across load-profile clusters. +of household-day observations across load-profile clusters. -Unit of analysis: ONE ROW PER (block_group_geoid, cluster). +Unit of Analysis +---------------- +Block-group x cluster counts of HOUSEHOLD-DAY observations (not households). -Data flow +Data Flow --------- -1. Load household-level cluster assignments from Stage 1. -2. Join accounts to Census block groups via ZIP+4 → block group crosswalk. -3. Aggregate to block-group X cluster counts and total households per block group. -4. Join block groups to Census demographics (block-group level). -5. Fit a multinomial logistic regression treating cluster as the outcome - and demographics as predictors, weighted by n_households. +1. Load household-day cluster assignments from Stage 1 (one row per household-day) +2. Join to Census block groups via ZIP+4 → block group crosswalk +3. Aggregate to block-group x cluster counts of household-day observations +4. Join block groups to Census demographics +5. Fit multinomial logistic regression: + - Outcome: cluster + - Predictors: demographics + - Weights: n_obs (household-day count) Outputs ------- @@ -54,7 +58,7 @@ format="%(asctime)s - %(levelname)s - %(message)s", ) -# Default predictors (leaner set to help with convergence and interpretability) +# Default predictors (for convergence and interpretability) DEFAULT_PREDICTORS = [ "Median_Household_Income", "Median_Age", @@ -64,30 +68,31 @@ # ============================================================================= -# 1. LOAD CLUSTERS AND MAP TO BLOCK GROUPS +# 1. LOAD CLUSTER ASSIGNMENTS (HOUSEHOLD-DAY LEVEL) # ============================================================================= -def load_cluster_assignments(path: Path) -> tuple[pl.DataFrame, dict]: +def load_cluster_assignments_household_day(path: Path) -> tuple[pl.DataFrame, dict]: """ - Load household-day cluster assignments and aggregate to household level. + Load household-day cluster assignments. - Each household is assigned to their DOMINANT cluster (most frequent). - We also compute a dominance metric: fraction of sampled days in that cluster. + Returns the raw Stage 1 output: one row per (household, day) with cluster label. + + I still compute "dominance" statistics for reporting purposes, but the + returned DataFrame keeps all household-day rows. Returns ------- df : pl.DataFrame - One row per household with columns: + One row per household-day with columns: - account_identifier - zip_code - - cluster (dominant cluster) - - n_days (total days sampled) - - dominant_days (days in dominant cluster) - - dominance (dominant_days / n_days) + - date (if present) + - cluster - confidence_stats : dict - Summary statistics on assignment confidence + dominance_stats : dict + Summary statistics on how consistently households stay in one cluster + (for reporting/interpretation, not used in regression) """ logger.info("Loading cluster assignments from %s", path) raw = pl.read_parquet(path) @@ -97,37 +102,57 @@ def load_cluster_assignments(path: Path) -> tuple[pl.DataFrame, dict]: if missing: raise ValueError(f"cluster_assignments missing required columns: {missing}") + n_household_days = len(raw) + n_households = raw["account_identifier"].n_unique() + n_clusters = raw["cluster"].n_unique() + logger.info( - " Raw data: %s household-days, %s accounts, %s ZIP+4s, %s clusters", - f"{len(raw):,}", - f"{raw['account_identifier'].n_unique():,}", - f"{raw['zip_code'].n_unique():,}", - raw["cluster"].n_unique(), + " Loaded: %s household-day observations, %s households, %s clusters", + f"{n_household_days:,}", + f"{n_households:,}", + n_clusters, ) + # This doesn't affect the regression - just useful context + dominance_stats = _compute_dominance_stats(raw) + + logger.info( + " Dominance stats: mean=%.1f%%, median=%.1f%%, >50%%: %.1f%% of households", + dominance_stats["dominance_mean"] * 100, + dominance_stats["dominance_median"] * 100, + dominance_stats["pct_above_50"], + ) + + return raw, dominance_stats + + +def _compute_dominance_stats(df: pl.DataFrame) -> dict: + """ + Compute how consistently each household stays in one cluster. + + For each household: + - dominance = (days in most frequent cluster) / (total days) + + Returns summary statistics across all households. + """ # Count days per (household, cluster) - counts = raw.group_by(["account_identifier", "zip_code", "cluster"]).agg(pl.len().alias("days_in_cluster")) + counts = df.group_by(["account_identifier", "cluster"]).agg(pl.len().alias("days_in_cluster")) # Total days per household totals = counts.group_by("account_identifier").agg(pl.col("days_in_cluster").sum().alias("n_days")) - # Find dominant cluster per household (most days) - dominant = ( - counts.sort(["account_identifier", "days_in_cluster"], descending=[False, True]) - .group_by("account_identifier", maintain_order=True) - .first() - .rename({"days_in_cluster": "dominant_days"}) - ) + # Max days in any single cluster per household + max_days = counts.group_by("account_identifier").agg(pl.col("days_in_cluster").max().alias("max_days_in_cluster")) - # Combine and compute dominance score - df = dominant.join(totals, on="account_identifier").with_columns( - (pl.col("dominant_days") / pl.col("n_days")).alias("dominance") + # Compute dominance + dominance_df = max_days.join(totals, on="account_identifier").with_columns( + (pl.col("max_days_in_cluster") / pl.col("n_days")).alias("dominance") ) - # Compute confidence statistics - dominance_values = df["dominance"].to_numpy() - confidence_stats = { - "n_households": len(df), + dominance_values = dominance_df["dominance"].to_numpy() + + return { + "n_households": len(dominance_df), "dominance_mean": float(dominance_values.mean()), "dominance_median": float(np.median(dominance_values)), "dominance_std": float(dominance_values.std()), @@ -138,23 +163,17 @@ def load_cluster_assignments(path: Path) -> tuple[pl.DataFrame, dict]: "pct_above_80": float((dominance_values > 0.8).mean() * 100), } - logger.info(" Aggregated to %s households (one cluster per household)", f"{len(df):,}") - logger.info( - " Cluster dominance: mean=%.1f%%, median=%.1f%%, >50%%: %.1f%% of households", - confidence_stats["dominance_mean"] * 100, - confidence_stats["dominance_median"] * 100, - confidence_stats["pct_above_50"], - ) - return df, confidence_stats +# ============================================================================= +# 2. CROSSWALK AND BLOCK GROUP MAPPING +# ============================================================================= def load_crosswalk(crosswalk_path: Path, zip_codes: list[str]) -> pl.DataFrame: - """Load ZIP+4 → Census block-group crosswalk for the ZIP+4s in our data. + """ + Load ZIP+4 → Census block-group crosswalk for the ZIP+4s in our data. - Also runs a small diagnostic to detect fan-out (ZIP+4 mapping to - multiple block groups), which would imply potential over-counting - if we don't re-weight. + Also runs a diagnostic to detect fan-out (ZIP+4 mapping to multiple block groups). """ logger.info("Loading crosswalk from %s", crosswalk_path) @@ -177,79 +196,113 @@ def load_crosswalk(crosswalk_path: Path, zip_codes: list[str]) -> pl.DataFrame: f"{len(set(zip_codes)):,}", ) - # Fan-out diagnostic: do any ZIP+4s map to multiple block groups? if crosswalk.is_empty(): logger.warning(" Crosswalk is empty after filtering for sample ZIP+4s.") return crosswalk + # Fan-out diagnostic fanout = crosswalk.group_by("zip_code").agg(pl.n_unique("block_group_geoid").alias("n_block_groups")) max_fanout = int(fanout["n_block_groups"].max()) if max_fanout > 1: fanout_summary = fanout.group_by("n_block_groups").agg(pl.len().alias("n_zip4")).sort("n_block_groups") logger.warning( - " WARNING: ZIP+4 → block-group crosswalk has fan-out for the " - "current sample (some ZIP+4s map to multiple block groups). " - "Distribution of mappings:\n%s", + " WARNING: ZIP+4 → block-group crosswalk has fan-out (some ZIP+4s map to multiple block groups):\n%s", fanout_summary, ) else: - logger.info( - " Crosswalk is 1-to-1 for the current sample: each ZIP+4 maps to exactly one block group (no fan-out)." - ) + logger.info(" Crosswalk is 1-to-1: each ZIP+4 maps to exactly one block group.") return crosswalk -def attach_block_groups(clusters: pl.DataFrame, crosswalk: pl.DataFrame) -> pl.DataFrame: - """Attach block-group GEOIDs to cluster assignments via ZIP+4.""" - logger.info("Joining cluster assignments to block groups...") +def attach_block_groups_to_household_days( + household_days: pl.DataFrame, + crosswalk: pl.DataFrame, +) -> pl.DataFrame: + """ + Attach block-group GEOIDs to household-day observations via ZIP+4. + + Input: one row per household-day + Output: one row per household-day with block_group_geoid attached + """ + logger.info("Joining household-day observations to block groups...") - df = clusters.join(crosswalk, on="zip_code", how="left") + df = household_days.join(crosswalk, on="zip_code", how="left") n_before = len(df) missing = df.filter(pl.col("block_group_geoid").is_null()).height if missing > 0: pct = missing / n_before * 100 - logger.warning(" %s (%.1f%%) rows missing block_group - dropping", f"{missing:,}", pct) + logger.warning(" %s (%.1f%%) observations missing block_group - dropping", f"{missing:,}", pct) df = df.filter(pl.col("block_group_geoid").is_not_null()) - logger.info(" Assignments cover %s block groups", f"{df['block_group_geoid'].n_unique():,}") + logger.info( + " %s household-day observations across %s block groups", + f"{len(df):,}", + f"{df['block_group_geoid'].n_unique():,}", + ) + + return df - return df.select(["account_identifier", "block_group_geoid", "cluster"]) + +# ============================================================================= +# 3. AGGREGATE TO BLOCK-GROUP x CLUSTER COUNTS +# ============================================================================= def aggregate_blockgroup_cluster_counts(df: pl.DataFrame) -> pl.DataFrame: """ - Aggregate household-level cluster assignments to block-group X cluster counts. + Aggregate household-day observations to block-group x cluster counts. - Input: one row per household with dominant cluster assignment. - Output: one row per (block_group, cluster) with count of households. + Input: one row per household-day with columns: + - account_identifier + - block_group_geoid + - cluster + + Output: one row per (block_group_geoid, cluster) with: + - n_obs : count of household-day observations + - n_households : count of distinct households (for context) + - total_obs : total household-day observations in the block group + - total_households: total distinct households in the block group + - cluster_share : n_obs / total_obs """ - logger.info("Aggregating to block-group X cluster counts...") + logger.info("Aggregating to block-group x cluster counts (household-day units)...") - # Count households per (block_group, cluster) - counts = df.group_by(["block_group_geoid", "cluster"]).agg(pl.len().alias("n_households")) + # Counts per (block_group, cluster) + counts = df.group_by(["block_group_geoid", "cluster"]).agg([ + pl.len().alias("n_obs"), + pl.col("account_identifier").n_unique().alias("n_households"), + ]) - # Total households per block group - totals = counts.group_by("block_group_geoid").agg(pl.col("n_households").sum().alias("total_households")) + # Totals per block_group + totals = df.group_by("block_group_geoid").agg([ + pl.len().alias("total_obs"), + pl.col("account_identifier").n_unique().alias("total_households"), + ]) + # Merge and compute cluster shares bg_counts = counts.join(totals, on="block_group_geoid", how="left").with_columns( - (pl.col("n_households") / pl.col("total_households")).alias("cluster_share") + (pl.col("n_obs") / pl.col("total_obs")).alias("cluster_share") ) logger.info( - " Created %s rows across %s block groups", + " Created %s (block_group, cluster) rows across %s block groups", f"{len(bg_counts):,}", f"{bg_counts['block_group_geoid'].n_unique():,}", ) + logger.info( + " Total observations: %s, Total households: %s", + f"{bg_counts['n_obs'].sum():,}", + f"{totals['total_households'].sum():,}", + ) return bg_counts # ============================================================================= -# 2. CENSUS DATA HANDLING +# 4. CENSUS DATA # ============================================================================= @@ -281,7 +334,7 @@ def attach_census_to_blockgroups(bg_counts: pl.DataFrame, census_df: pl.DataFram census_df = census_df.with_columns(pl.col("GEOID").cast(pl.Utf8).str.zfill(12).alias("block_group_geoid")) - demo = bg_counts.lazy().join(census_df.lazy(), on="block_group_geoid", how="left").collect() + demo = bg_counts.join(census_df, on="block_group_geoid", how="left") n_before = len(demo) missing = demo.filter(pl.col("GEOID").is_null()).height @@ -297,26 +350,36 @@ def attach_census_to_blockgroups(bg_counts: pl.DataFrame, census_df: pl.DataFram # ============================================================================= -# 3. PREPARE REGRESSION DATA +# 5. PREPARE REGRESSION DATA # ============================================================================= def prepare_regression_dataset( demo_df: pl.DataFrame, predictors: list[str], - min_households_per_bg: int = 10, + min_obs_per_bg: int = 50, min_nonzero_clusters_per_bg: int = 2, ) -> tuple[pl.DataFrame, list[str]]: - """Prepare block-group X cluster dataset for regression.""" + """ + Prepare block-group x cluster dataset for regression. + + Filters: + - Block groups with fewer than min_obs_per_bg household-day observations + - Block groups with fewer than min_nonzero_clusters_per_bg clusters represented + """ logger.info("Preparing regression dataset...") - # Filter by minimum households - df = demo_df.filter(pl.col("total_households") >= min_households_per_bg) - logger.info(" After min_households filter: %s block groups", f"{df['block_group_geoid'].n_unique():,}") + # Filter by minimum observations (household-days) + df = demo_df.filter(pl.col("total_obs") >= min_obs_per_bg) + logger.info( + " After min_obs filter (>=%d): %s block groups", + min_obs_per_bg, + f"{df['block_group_geoid'].n_unique():,}", + ) # Require block groups to have multiple clusters represented nonzero_counts = ( - df.filter(pl.col("n_households") > 0).group_by("block_group_geoid").agg(pl.len().alias("n_nonzero_clusters")) + df.filter(pl.col("n_obs") > 0).group_by("block_group_geoid").agg(pl.len().alias("n_nonzero_clusters")) ) df = ( @@ -325,9 +388,13 @@ def prepare_regression_dataset( .drop("n_nonzero_clusters") ) - logger.info(" After cluster diversity filter: %s block groups", f"{df['block_group_geoid'].n_unique():,}") + logger.info( + " After cluster diversity filter (>=%d clusters): %s block groups", + min_nonzero_clusters_per_bg, + f"{df['block_group_geoid'].n_unique():,}", + ) - # Filter predictors to those that exist and have acceptable missingness + # Filter predictors available_predictors: list[str] = [] for p in predictors: if p not in df.columns: @@ -339,7 +406,7 @@ def prepare_regression_dataset( continue available_predictors.append(p) - logger.info(" Using %s predictors: %s", len(available_predictors), available_predictors) + logger.info(" Using %d predictors: %s", len(available_predictors), available_predictors) if not available_predictors: raise ValueError("No valid predictors available") @@ -355,7 +422,7 @@ def prepare_regression_dataset( # ============================================================================= -# 4. MULTINOMIAL REGRESSION +# 6. MULTINOMIAL REGRESSION # ============================================================================= @@ -363,27 +430,29 @@ def run_multinomial_regression( reg_df: pl.DataFrame, predictors: list[str], outcome: str = "cluster", - weight_col: str = "n_households", + weight_col: str = "n_obs", standardize: bool = False, ) -> dict[str, object]: - """Run multinomial logistic regression with statsmodels. + """ + Run multinomial logistic regression with statsmodels. Parameters ---------- reg_df : pl.DataFrame - Long-form data, one row per (block_group_geoid, cluster) with - columns including predictors, outcome, and weights. + Long-form data, one row per (block_group_geoid, cluster). predictors : list[str] Names of predictor columns. outcome : str, default "cluster" - Name of the outcome column (cluster index). - weight_col : str, default "n_households" - Column providing frequency weights (households per BG x cluster). + Name of the outcome column. + weight_col : str, default "n_obs" + Column providing frequency weights. Default is n_obs (household-day + observations), which weights by the number of household-day profiles + in each block-group x cluster cell. standardize : bool, default False - If True, standardize predictors before regression. If False, - use raw units (easier to interpret coefficients and ORs). + If True, standardize predictors before regression. """ logger.info("Running multinomial logistic regression...") + logger.info(" Weighting by: %s (household-day observations)", weight_col) # Extract arrays X = reg_df.select(predictors).to_numpy().astype(np.float64) @@ -399,13 +468,11 @@ def run_multinomial_regression( if len(X) == 0: raise ValueError("No observations remaining after dropping NaN rows.") - # Count block groups with complete predictor data (for reporting) - # (This is approximate, but good enough for a summary line.) n_block_groups = reg_df.filter(~pl.any_horizontal(pl.col(predictors).is_null()))["block_group_geoid"].n_unique() - # Standardize or leave in raw units + # Standardize or use raw units if standardize: - logger.info(" Standardizing predictors (mean 0, sd 1)...") + logger.info(" Standardizing predictors...") scaler = StandardScaler() X_transformed = scaler.fit_transform(X) else: @@ -415,15 +482,16 @@ def run_multinomial_regression( # Add intercept X_with_const = sm.add_constant(X_transformed) - # Expand rows by integer weights for frequency weighting + # Expand rows by integer weights weight_ints = np.maximum(np.round(weights).astype(int), 1) X_expanded = np.repeat(X_with_const, weight_ints, axis=0) y_expanded = np.repeat(y, weight_ints) logger.info( - " Training on %s expanded rows (%s block groups)", + " Training on %s expanded rows (%s block groups, %s total household-day obs)", f"{len(X_expanded):,}", n_block_groups, + f"{int(weights.sum()):,}", ) model = sm.MNLogit(y_expanded, X_expanded) @@ -454,6 +522,8 @@ def run_multinomial_regression( odds_ratios[baseline_key] = dict.fromkeys(param_names, 1.0) logger.info(" Baseline cluster: %s", baseline) + logger.info(" Converged: %s", result.mle_retvals.get("converged", True)) + logger.info(" Pseudo R²: %.4f", result.prsquared) return { "n_rows": len(X), @@ -461,9 +531,11 @@ def run_multinomial_regression( "n_block_groups": int(n_block_groups), "n_clusters": len(classes), "n_predictors": len(predictors), + "total_household_day_obs": int(weights.sum()), "clusters": classes, "baseline_cluster": int(baseline), "predictors": predictors, + "weight_col": weight_col, "coefficients": coefficients, "std_errors": std_errors, "p_values": p_values, @@ -475,51 +547,58 @@ def run_multinomial_regression( } +# ============================================================================= +# 7. REPORT GENERATION +# ============================================================================= + + def generate_report( results: dict[str, object], cluster_distribution: pl.DataFrame, + dominance_stats: dict, output_path: Path, ) -> None: """Generate human-readable summary.""" - conf = results.get("cluster_assignment_confidence", {}) - - n_households = conf.get("n_households") - households_line = " Households: N/A" if n_households is None else f" Households: {n_households:,}" - lines = [ "=" * 70, "STAGE 2: BLOCK-GROUP MULTINOMIAL REGRESSION RESULTS", "=" * 70, "", + "ANALYSIS UNIT: HOUSEHOLD-DAY OBSERVATIONS", + "-" * 70, + "Each row in the regression represents a (block_group, cluster) pair,", + "weighted by the number of household-day observations in that cell.", + "", "MODEL SUMMARY", "-" * 70, f"Block groups: {results['n_block_groups']:,}", - f"Rows (block_group X cluster): {results['n_rows']:,}", - f"Expanded rows (for weighting): {results['n_expanded_rows']:,}", + f"Rows (block_group x cluster): {results['n_rows']:,}", + f"Total household-day observations: {results['total_household_day_obs']:,}", f"Clusters: {results['n_clusters']}", f"Predictors: {results['n_predictors']}", + f"Weight column: {results['weight_col']}", f"Baseline cluster: {results['baseline_cluster']}", - f"Pseudo R²: {results['pseudo_r2']:.3f}", + f"Pseudo R²: {results['pseudo_r2']:.4f}", f"Converged: {results['converged']}", "", - "CLUSTER ASSIGNMENT CONFIDENCE", + "HOUSEHOLD CLUSTER CONSISTENCY (for interpretation context)", "-" * 70, - "Each household assigned to their most frequent (dominant) cluster.", - "Dominance = fraction of household's days in their assigned cluster.", + "How consistently do households stay in one cluster across sampled days?", + "(This doesn't affect the regression - just useful context.)", "", - households_line, - f" Mean dominance: {conf.get('dominance_mean', 0) * 100:.1f}%", - f" Median dominance: {conf.get('dominance_median', 0) * 100:.1f}%", - f" Households >50% in one cluster: {conf.get('pct_above_50', 0):.1f}%", - f" Households >67% in one cluster: {conf.get('pct_above_67', 0):.1f}%", - f" Households >80% in one cluster: {conf.get('pct_above_80', 0):.1f}%", + f" Households: {dominance_stats['n_households']:,}", + f" Mean dominance: {dominance_stats['dominance_mean'] * 100:.1f}%", + f" Median dominance: {dominance_stats['dominance_median'] * 100:.1f}%", + f" Households >50% in one cluster: {dominance_stats['pct_above_50']:.1f}%", + f" Households >67% in one cluster: {dominance_stats['pct_above_67']:.1f}%", + f" Households >80% in one cluster: {dominance_stats['pct_above_80']:.1f}%", "", - "CLUSTER DISTRIBUTION", + "CLUSTER DISTRIBUTION (by household-day observations)", "-" * 70, ] for row in cluster_distribution.iter_rows(named=True): - lines.append(f" Cluster {row['cluster']}: {row['n_households']:,} households ({row['pct']:.1f}%)") + lines.append(f" Cluster {row['cluster']}: {row['n_obs']:,} obs ({row['pct']:.1f}%)") lines.extend([ "", @@ -538,7 +617,6 @@ def generate_report( pvals = results["p_values"][key] ors = results["odds_ratios"][key] - # Sort by |coefficient|, exclude intercept sorted_preds = sorted( [(p, coefs[p]) for p in results["predictors"]], key=lambda x: abs(x[1]), @@ -559,13 +637,13 @@ def generate_report( # ============================================================================= -# 5. CLI +# 8. CLI # ============================================================================= def main() -> int: parser = argparse.ArgumentParser( - description="Stage 2: Block-group-level regression of cluster composition.", + description="Stage 2: Block-group-level regression using household-day units.", formatter_class=argparse.RawDescriptionHelpFormatter, ) @@ -576,21 +654,22 @@ def main() -> int: type=Path, default=Path("data/reference/census_17_2023.parquet"), ) - parser.add_argument( - "--fetch-census", - action="store_true", - help="Force re-fetch Census data", - ) + parser.add_argument("--fetch-census", action="store_true", help="Force re-fetch Census data") parser.add_argument("--state-fips", default="17") parser.add_argument("--acs-year", type=int, default=2023) - parser.add_argument("--min-households-per-bg", type=int, default=10) - parser.add_argument("--min-nonzero-clusters-per-bg", type=int, default=2) parser.add_argument( - "--predictors", - nargs="+", - default=DEFAULT_PREDICTORS, - help="Predictor columns", + "--min-obs-per-bg", + type=int, + default=50, + help="Minimum household-day observations per block group (default: 50)", ) + parser.add_argument( + "--min-nonzero-clusters-per-bg", + type=int, + default=2, + help="Minimum clusters represented per block group (default: 2)", + ) + parser.add_argument("--predictors", nargs="+", default=DEFAULT_PREDICTORS, help="Predictor columns") parser.add_argument( "--output-dir", type=Path, @@ -599,7 +678,7 @@ def main() -> int: parser.add_argument( "--standardize", action="store_true", - help="Standardize predictors before regression. Default is to use raw units (easier to interpret).", + help="Standardize predictors before regression (default: use raw units).", ) args = parser.parse_args() @@ -614,33 +693,36 @@ def main() -> int: args.output_dir.mkdir(parents=True, exist_ok=True) print("=" * 70) - print("STAGE 2: BLOCK-GROUP REGRESSION") + print("STAGE 2: BLOCK-GROUP REGRESSION (HOUSEHOLD-DAY UNITS)") print("=" * 70) - # Load and process data - clusters, confidence_stats = load_cluster_assignments(args.clusters) - crosswalk = load_crosswalk(args.crosswalk, clusters["zip_code"].unique().to_list()) - clusters_bg = attach_block_groups(clusters, crosswalk) - bg_counts = aggregate_blockgroup_cluster_counts(clusters_bg) + # 1. Load household-day cluster assignments (NO dominant cluster reduction) + household_days, dominance_stats = load_cluster_assignments_household_day(args.clusters) + + # 2. Load crosswalk and attach block groups + zip_codes = household_days["zip_code"].unique().to_list() + crosswalk = load_crosswalk(args.crosswalk, zip_codes) + household_days_bg = attach_block_groups_to_household_days(household_days, crosswalk) + + # 3. Aggregate to block-group x cluster counts + bg_counts = aggregate_blockgroup_cluster_counts(household_days_bg) + # 4. Load Census and attach demographics census_df = fetch_or_load_census( cache_path=args.census_cache, state_fips=args.state_fips, acs_year=args.acs_year, force_fetch=args.fetch_census, ) - logger.info( - " Census: %s block groups, %s columns", - f"{len(census_df):,}", - len(census_df.columns), - ) + logger.info(" Census: %s block groups, %s columns", f"{len(census_df):,}", len(census_df.columns)) demo_df = attach_census_to_blockgroups(bg_counts, census_df) + # 5. Prepare regression dataset reg_df, predictors = prepare_regression_dataset( demo_df=demo_df, predictors=args.predictors, - min_households_per_bg=args.min_households_per_bg, + min_obs_per_bg=args.min_obs_per_bg, min_nonzero_clusters_per_bg=args.min_nonzero_clusters_per_bg, ) @@ -648,20 +730,21 @@ def main() -> int: logger.error("No data after filtering") return 1 - # Save dataset used for regression + # Save regression dataset reg_df.write_parquet(args.output_dir / "regression_data_blockgroups.parquet") + logger.info("Saved regression data to %s", args.output_dir / "regression_data_blockgroups.parquet") - # Run regression + # 6. Run regression (weighted by n_obs = household-day counts) results = run_multinomial_regression( reg_df=reg_df, predictors=predictors, outcome="cluster", - weight_col="n_households", + weight_col="n_obs", # household-day observations standardize=args.standardize, ) - # Add confidence stats to results - results["cluster_assignment_confidence"] = confidence_stats + # Add dominance stats to results for reference + results["dominance_stats"] = dominance_stats # Save results model_summary = results.pop("model_summary") @@ -672,11 +755,17 @@ def main() -> int: # Generate report cluster_dist = ( reg_df.group_by("cluster") - .agg(pl.col("n_households").sum()) + .agg(pl.col("n_obs").sum()) .sort("cluster") - .with_columns((pl.col("n_households") / pl.col("n_households").sum() * 100).alias("pct")) + .with_columns((pl.col("n_obs") / pl.col("n_obs").sum() * 100).alias("pct")) + ) + + generate_report( + results, + cluster_dist, + dominance_stats, + args.output_dir / "regression_report_blockgroups.txt", ) - generate_report(results, cluster_dist, args.output_dir / "regression_report_blockgroups.txt") print(f"\nOutputs saved to: {args.output_dir}") return 0