From 4db1ea9d341518e69e51c9a07913720e894dd4c2 Mon Sep 17 00:00:00 2001 From: Griffin Sharps Date: Mon, 8 Dec 2025 17:24:05 +0000 Subject: [PATCH 1/4] High-volume improvements + regression fixes --- .../prepare_clustering_data_households.py | 2 +- .../stage2_blockgroup_regression.py | 117 ++++++++++++++---- 2 files changed, 91 insertions(+), 28 deletions(-) diff --git a/analysis/clustering/prepare_clustering_data_households.py b/analysis/clustering/prepare_clustering_data_households.py index 870a51b..9b4d6e2 100644 --- a/analysis/clustering/prepare_clustering_data_households.py +++ b/analysis/clustering/prepare_clustering_data_households.py @@ -554,7 +554,7 @@ def main() -> int: "--day-strategy", choices=["stratified", "random"], default="stratified", - help="Day sampling: stratified (70% weekday) or random.", + help="Day sampling: stratified (70%% weekday) or random.", ) parser.add_argument( "--seed", diff --git a/analysis/clustering/stage2_blockgroup_regression.py b/analysis/clustering/stage2_blockgroup_regression.py index d6c2a7a..6d749cb 100644 --- a/analysis/clustering/stage2_blockgroup_regression.py +++ b/analysis/clustering/stage2_blockgroup_regression.py @@ -50,18 +50,14 @@ format="%(asctime)s - %(levelname)s - %(message)s", ) -# Default predictors +# Default predictors (leaner set to help with convergence and interpretability) DEFAULT_PREDICTORS = [ "Median_Household_Income", "Median_Age", + "Urban_Percent", "Total_Households", "Owner_Occupied", "Renter_Occupied", - "Heat_Electric", - "Heat_Utility_Gas", - "Urban_Percent", - "Built_2020_After", - "Built_1939_Earlier", ] @@ -75,7 +71,7 @@ def load_cluster_assignments(path: Path) -> tuple[pl.DataFrame, dict]: Load household-day cluster assignments and aggregate to household level. Each household is assigned to their DOMINANT cluster (most frequent). - I track how confident this assignment is (what % of days fell into that cluster). + We also compute a dominance metric: fraction of sampled days in that cluster. Returns ------- @@ -91,7 +87,7 @@ def load_cluster_assignments(path: Path) -> tuple[pl.DataFrame, dict]: confidence_stats : dict Summary statistics on assignment confidence """ - logger.info(f"Loading cluster assignments from {path}") + logger.info("Loading cluster assignments from %s", path) raw = pl.read_parquet(path) required = ["account_identifier", "zip_code", "cluster"] @@ -152,8 +148,13 @@ def load_cluster_assignments(path: Path) -> tuple[pl.DataFrame, dict]: def load_crosswalk(crosswalk_path: Path, zip_codes: list[str]) -> pl.DataFrame: - """Load ZIP+4 → Census block-group crosswalk.""" - logger.info(f"Loading crosswalk from {crosswalk_path}") + """Load ZIP+4 → Census block-group crosswalk for the ZIP+4s in our data. + + Also runs a small diagnostic to detect fan-out (ZIP+4 mapping to + multiple block groups), which would imply potential over-counting + if we don't re-weight. + """ + logger.info("Loading crosswalk from %s", crosswalk_path) crosswalk = ( pl.scan_csv(crosswalk_path, separator="\t", infer_schema_length=10000) @@ -174,6 +175,27 @@ def load_crosswalk(crosswalk_path: Path, zip_codes: list[str]) -> pl.DataFrame: f"{len(set(zip_codes)):,}", ) + # Fan-out diagnostic: do any ZIP+4s map to multiple block groups? + if crosswalk.is_empty(): + logger.warning(" Crosswalk is empty after filtering for sample ZIP+4s.") + return crosswalk + + fanout = crosswalk.group_by("zip_code").agg(pl.n_unique("block_group_geoid").alias("n_block_groups")) + max_fanout = int(fanout["n_block_groups"].max()) + + if max_fanout > 1: + fanout_summary = fanout.group_by("n_block_groups").agg(pl.len().alias("n_zip4")).sort("n_block_groups") + logger.warning( + " WARNING: ZIP+4 → block-group crosswalk has fan-out for the " + "current sample (some ZIP+4s map to multiple block groups). " + "Distribution of mappings:\n%s", + fanout_summary, + ) + else: + logger.info( + " Crosswalk is 1-to-1 for the current sample: each ZIP+4 maps to exactly one block group (no fan-out)." + ) + return crosswalk @@ -309,8 +331,8 @@ def prepare_regression_dataset( logger.info(" After cluster diversity filter: %s block groups", f"{df['block_group_geoid'].n_unique():,}") - # Filter predictors to those that exist and have data - available_predictors = [] + # Filter predictors to those that exist and have acceptable missingness + available_predictors: list[str] = [] for p in predictors: if p not in df.columns: logger.warning(" Predictor not found: %s", p) @@ -367,6 +389,7 @@ def run_multinomial_regression( if len(X) == 0: raise ValueError("No observations remaining after dropping NaN rows.") + # Count block groups with complete predictor data (for reporting) n_block_groups = reg_df.filter(~pl.any_horizontal(pl.col(predictors).is_null()))["block_group_geoid"].n_unique() # Standardize and add intercept @@ -438,6 +461,9 @@ def generate_report( """Generate human-readable summary.""" conf = results.get("cluster_assignment_confidence", {}) + n_households = conf.get("n_households") + households_line = " Households: N/A" if n_households is None else f" Households: {n_households:,}" + lines = [ "=" * 70, "STAGE 2: BLOCK-GROUP MULTINOMIAL REGRESSION RESULTS", @@ -459,7 +485,7 @@ def generate_report( "Each household assigned to their most frequent (dominant) cluster.", "Dominance = fraction of household's days in their assigned cluster.", "", - f" Households: {conf.get('n_households', 'N/A'):,}", + households_line, f" Mean dominance: {conf.get('dominance_mean', 0) * 100:.1f}%", f" Median dominance: {conf.get('dominance_median', 0) * 100:.1f}%", f" Households >50% in one cluster: {conf.get('pct_above_50', 0):.1f}%", @@ -523,22 +549,44 @@ def main() -> int: parser.add_argument("--clusters", type=Path, required=True, help="cluster_assignments.parquet") parser.add_argument("--crosswalk", type=Path, required=True, help="ZIP+4 → block-group crosswalk") - parser.add_argument("--census-cache", type=Path, default=Path("data/reference/census_17_2023.parquet")) - parser.add_argument("--fetch-census", action="store_true", help="Force re-fetch Census data") + parser.add_argument( + "--census-cache", + type=Path, + default=Path("data/reference/census_17_2023.parquet"), + ) + parser.add_argument( + "--fetch-census", + action="store_true", + help="Force re-fetch Census data", + ) parser.add_argument("--state-fips", default="17") parser.add_argument("--acs-year", type=int, default=2023) parser.add_argument("--min-households-per-bg", type=int, default=10) parser.add_argument("--min-nonzero-clusters-per-bg", type=int, default=2) - parser.add_argument("--predictors", nargs="+", default=DEFAULT_PREDICTORS, help="Predictor columns") - parser.add_argument("--output-dir", type=Path, default=Path("data/clustering/results/stage2_blockgroups")) + parser.add_argument( + "--predictors", + nargs="+", + default=DEFAULT_PREDICTORS, + help="Predictor columns", + ) + parser.add_argument( + "--output-dir", + type=Path, + default=Path("data/clustering/results/stage2_blockgroups"), + ) + parser.add_argument( + "--standardize", + action="store_true", + help="Standardize predictors before regression. Default is to use raw units (easier to interpret).", + ) args = parser.parse_args() if not args.clusters.exists(): - logger.error(f"Cluster assignments not found: {args.clusters}") + logger.error("Cluster assignments not found: %s", args.clusters) return 1 if not args.crosswalk.exists(): - logger.error(f"Crosswalk not found: {args.crosswalk}") + logger.error("Crosswalk not found: %s", args.crosswalk) return 1 args.output_dir.mkdir(parents=True, exist_ok=True) @@ -553,27 +601,42 @@ def main() -> int: clusters_bg = attach_block_groups(clusters, crosswalk) bg_counts = aggregate_blockgroup_cluster_counts(clusters_bg) - census_df = fetch_or_load_census(args.census_cache, args.state_fips, args.acs_year, args.fetch_census) - logger.info(" Census: %s block groups, %s columns", f"{len(census_df):,}", len(census_df.columns)) + census_df = fetch_or_load_census( + cache_path=args.census_cache, + state_fips=args.state_fips, + acs_year=args.acs_year, + force_fetch=args.fetch_census, + ) + logger.info( + " Census: %s block groups, %s columns", + f"{len(census_df):,}", + len(census_df.columns), + ) demo_df = attach_census_to_blockgroups(bg_counts, census_df) reg_df, predictors = prepare_regression_dataset( - demo_df, - args.predictors, - args.min_households_per_bg, - args.min_nonzero_clusters_per_bg, + demo_df=demo_df, + predictors=args.predictors, + min_households_per_bg=args.min_households_per_bg, + min_nonzero_clusters_per_bg=args.min_nonzero_clusters_per_bg, ) if reg_df.is_empty(): logger.error("No data after filtering") return 1 - # Save dataset + # Save dataset used for regression reg_df.write_parquet(args.output_dir / "regression_data_blockgroups.parquet") # Run regression - results = run_multinomial_regression(reg_df, predictors) + results = run_multinomial_regression( + reg_df=reg_df, + predictors=predictors, + outcome="cluster", + weight_col="n_households", + standardize=args.standardize, + ) # Add confidence stats to results results["cluster_assignment_confidence"] = confidence_stats From 7cf8470dea38bd3a4088831b6afaec137fc1fcda Mon Sep 17 00:00:00 2001 From: Griffin Sharps Date: Mon, 8 Dec 2025 18:10:31 +0000 Subject: [PATCH 2/4] Add manifest utilities for high-volume processing --- smart_meter_analysis/manifests.py | 298 ++++++++++++++++++++++++++++++ 1 file changed, 298 insertions(+) create mode 100644 smart_meter_analysis/manifests.py diff --git a/smart_meter_analysis/manifests.py b/smart_meter_analysis/manifests.py new file mode 100644 index 0000000..30c8d7b --- /dev/null +++ b/smart_meter_analysis/manifests.py @@ -0,0 +1,298 @@ +""" +Manifest utilities for memory-safe processing of large parquet files. + +These are small parquet files containing unique values extracted from +large source files using streaming (sink_parquet). This avoids OOM when +calling unique().collect() on files with hundreds of millions of rows. + +Usage: + from smart_meter_analysis.manifests import ( + ensure_account_manifest, + ensure_date_manifest, + load_account_sample, + load_date_sample, + ) + + # First call builds manifest via streaming (may take a few minutes) + # Subsequent calls return instantly from cache + account_manifest = ensure_account_manifest( + Path("data/processed/comed_202308.parquet") + ) + accounts_df = pl.read_parquet(account_manifest) +""" + +from __future__ import annotations + +import logging +from pathlib import Path +from typing import Any, Literal + +import polars as pl + +logger = logging.getLogger(__name__) + + +# ============================================================================= +# Helpers +# ============================================================================= + + +def log_memory(label: str) -> None: + """Log current RSS memory usage (Linux only).""" + try: + with open("/proc/self/status") as f: + for line in f: + if line.startswith("VmRSS:"): + mem_mb = int(line.split()[1]) / 1024 + logger.debug("[MEMORY] %s: %.0f MB", label, mem_mb) + break + except FileNotFoundError as exc: + # Likely not on Linux; /proc may not exist + logger.debug("Skipping memory logging (%s): %s", type(exc).__name__, exc) + except OSError as exc: + # Permission issues or other OS-level problems + logger.debug("Could not read /proc/self/status for memory logging: %s", exc) + + +def _validate_input_has_columns(input_path: Path, required: list[str]) -> None: + """Ensure the input parquet exists and has the required columns.""" + if not input_path.exists(): + raise FileNotFoundError(f"Input parquet not found: {input_path}") + + schema = pl.scan_parquet(input_path).collect_schema() + missing = [c for c in required if c not in schema.names()] + if missing: + raise ValueError(f"Input parquet {input_path} missing required columns: {missing}") + + +# ============================================================================= +# Manifest creation +# ============================================================================= + + +def ensure_account_manifest(input_path: Path) -> Path: + """ + Create or load account manifest using streaming sink. + + The manifest contains unique (account_identifier, zip_code) pairs, + extracted from the source file without loading it fully into memory. + + Args: + input_path: + Path to interval-level parquet file. + + Returns: + Path to account manifest parquet file. + + Example: + >>> manifest_path = ensure_account_manifest(Path("comed_202308.parquet")) + >>> accounts_df = pl.read_parquet(manifest_path) + >>> accounts = accounts_df["account_identifier"].to_list() + """ + _validate_input_has_columns(input_path, ["account_identifier", "zip_code"]) + + manifest_path = input_path.parent / f"{input_path.stem}_accounts.parquet" + + # Check for existing valid manifest + if manifest_path.exists(): + try: + n = pl.scan_parquet(manifest_path).select(pl.len()).collect()[0, 0] + if n > 0: + logger.info( + "Using existing account manifest: %s (%s accounts)", + manifest_path, + f"{n:,}", + ) + return manifest_path + except Exception: + logger.warning( + "Account manifest is corrupt or unreadable, rebuilding: %s", + manifest_path, + ) + + # Build manifest using streaming + logger.info("Building account manifest from %s (streaming)...", input_path) + log_memory("before account manifest") + + (pl.scan_parquet(input_path).select(["account_identifier", "zip_code"]).unique().sink_parquet(manifest_path)) + + log_memory("after account manifest") + + # Validate and report + n = pl.scan_parquet(manifest_path).select(pl.len()).collect()[0, 0] + logger.info( + "Account manifest complete (rebuilt): %s (%s accounts)", + manifest_path, + f"{n:,}", + ) + + return manifest_path + + +def ensure_date_manifest(input_path: Path) -> Path: + """ + Create or load date manifest using streaming sink. + + The manifest contains unique (date, is_weekend, weekday) tuples. + This is typically small (~31 rows for a month) but we use streaming + for consistency and to avoid scanning the full file into memory. + + Args: + input_path: + Path to interval-level parquet file. + + Returns: + Path to date manifest parquet file. + """ + _validate_input_has_columns(input_path, ["date", "is_weekend", "weekday"]) + + manifest_path = input_path.parent / f"{input_path.stem}_dates.parquet" + + # Check for existing valid manifest + if manifest_path.exists(): + try: + n = pl.scan_parquet(manifest_path).select(pl.len()).collect()[0, 0] + if n > 0: + logger.info( + "Using existing date manifest: %s (%s dates)", + manifest_path, + f"{n:,}", + ) + return manifest_path + except Exception: + logger.warning( + "Date manifest is corrupt or unreadable, rebuilding: %s", + manifest_path, + ) + + # Build manifest using streaming + logger.info("Building date manifest from %s (streaming)...", input_path) + log_memory("before date manifest") + + (pl.scan_parquet(input_path).select(["date", "is_weekend", "weekday"]).unique().sink_parquet(manifest_path)) + + log_memory("after date manifest") + + # Validate and report + n = pl.scan_parquet(manifest_path).select(pl.len()).collect()[0, 0] + logger.info( + "Date manifest complete (rebuilt): %s (%s dates)", + manifest_path, + f"{n:,}", + ) + + return manifest_path + + +# ============================================================================= +# Sampling helpers +# ============================================================================= + + +def load_account_sample( + manifest_path: Path, + sample_n: int | None = None, + seed: int = 42, +) -> list[str]: + """ + Load account identifiers from manifest, optionally sampling. + + Args: + manifest_path: + Path to account manifest parquet. + sample_n: + Number of accounts to sample (None = all). + seed: + Random seed for reproducible sampling. + + Returns: + List of account_identifier strings. + """ + df = pl.read_parquet(manifest_path) + + if df.is_empty(): + logger.warning("Account manifest %s is empty; returning no accounts", manifest_path) + return [] + + if sample_n is not None and sample_n < len(df): + df = df.sample(n=sample_n, shuffle=True, seed=seed) + logger.info("Sampled %s accounts from manifest", f"{len(df):,}") + else: + logger.info("Using all %s accounts from manifest", f"{len(df):,}") + + return df["account_identifier"].to_list() + + +def load_date_sample( + manifest_path: Path, + sample_n: int, + strategy: Literal["stratified", "random"] = "stratified", + seed: int = 42, +) -> list[Any]: + """ + Load dates from manifest with stratified or random sampling. + + Args: + manifest_path: + Path to date manifest parquet. + sample_n: + Number of dates to sample. + strategy: + "stratified" (70% weekday, 30% weekend) or "random". + seed: + Random seed for reproducible sampling. + + Returns: + List of date values (type depends on input schema). + """ + df = pl.read_parquet(manifest_path) + + if df.is_empty(): + logger.warning("Date manifest %s is empty; returning no dates", manifest_path) + return [] + + if sample_n <= 0: + logger.info("Requested sample_n <= 0; returning empty date list") + return [] + + # Stratified sampling if both weekday and weekend exist + if strategy == "stratified": + weekday_df = df.filter(~pl.col("is_weekend")) + weekend_df = df.filter(pl.col("is_weekend")) + + if weekday_df.height == 0 or weekend_df.height == 0: + logger.warning("Missing weekdays or weekends in manifest; falling back to random sampling") + strategy = "random" + + if strategy == "stratified": + n_weekdays = int(sample_n * 0.7) + n_weekends = sample_n - n_weekdays + + n_weekdays = min(n_weekdays, len(weekday_df)) + n_weekends = min(n_weekends, len(weekend_df)) + + sampled_weekdays = ( + weekday_df.sample(n=n_weekdays, shuffle=True, seed=seed)["date"].to_list() if n_weekdays > 0 else [] + ) + sampled_weekends = ( + weekend_df.sample( + n=n_weekends, + shuffle=True, + seed=seed + 1, + )["date"].to_list() + if n_weekends > 0 + else [] + ) + + dates = sampled_weekdays + sampled_weekends + logger.info( + "Sampled %d weekdays + %d weekend days (stratified)", + len(sampled_weekdays), + len(sampled_weekends), + ) + else: + n_sample = min(sample_n, len(df)) + dates = df.sample(n=n_sample, shuffle=True, seed=seed)["date"].to_list() + logger.info("Sampled %d dates (random)", len(dates)) + + return dates From 64e3c11b5f24b0cb79991536661ebf7b3eb5c011 Mon Sep 17 00:00:00 2001 From: Griffin Sharps Date: Wed, 10 Dec 2025 20:04:23 +0000 Subject: [PATCH 3/4] Refactor clustering prep for streaming manifests and high-volume runs - Add manifest-based metadata sampling: - smart_meter_analysis.manifests.ensure_account_manifest() - smart_meter_analysis.manifests.ensure_date_manifest() - Memory now scales with number of accounts / days, not 300M+ rows. - Update prepare_clustering_data_households.py to: - Use manifests for sampling accounts and dates. - Support chunked streaming profile construction by household. - Successfully build ~540k profiles from 10k files, 20k households. - Update euclidean_clustering.py to support large-N runs - Sampled silhouette evaluation and improved logging. - Wire scripts/run_comed_pipeline.py to: - Default to streaming profile construction. - Expose sampling and chunk-size parameters at the CLI. - Treat Stage 1 as the high-volume streaming path. Tested: - End-to-end run on 202308_10000: - ~335M interval rows - ~225k households - 20k sampled households x 31 days - 538,552 complete profiles clustered into 4 groups --- analysis/clustering/euclidean_clustering.py | 176 +++++-- .../prepare_clustering_data_households.py | 438 ++++++++++++------ .../stage2_blockgroup_regression.py | 60 ++- docs/index.html | 2 +- scripts/run_comed_pipeline.py | 385 ++++++++------- smart_meter_analysis/manifests.py | 82 ++-- 6 files changed, 731 insertions(+), 412 deletions(-) diff --git a/analysis/clustering/euclidean_clustering.py b/analysis/clustering/euclidean_clustering.py index e591797..983c14e 100644 --- a/analysis/clustering/euclidean_clustering.py +++ b/analysis/clustering/euclidean_clustering.py @@ -11,18 +11,19 @@ Pipeline: 1. Load daily profiles from Phase 1 2. Normalize profiles (optional but recommended) - 3. Evaluate k values to find optimal k (via silhouette score) + 3. Evaluate k values to find optimal k (via silhouette score on a sample) 4. Run final clustering with optimal k 5. Output assignments, centroids, and visualizations Usage: - # Standard run (evaluates k=3-6) + # Standard run (evaluates k=3-6 using silhouette on up to 10k samples) python euclidean_clustering.py \\ --input data/clustering/sampled_profiles.parquet \\ --output-dir data/clustering/results \\ --k-range 3 6 \\ --find-optimal-k \\ - --normalize + --normalize \\ + --silhouette-sample-size 10000 # Fixed k (no evaluation) python euclidean_clustering.py \\ @@ -68,9 +69,13 @@ def load_profiles(path: Path) -> tuple[np.ndarray, pl.DataFrame]: # Extract profiles as numpy array profiles = np.array(df["profile"].to_list(), dtype=np.float64) - logger.info(f" Loaded {len(profiles):,} profiles with {profiles.shape[1]} time points each") - logger.info(f" Data shape: {profiles.shape}") - logger.info(f" Data range: [{profiles.min():.2f}, {profiles.max():.2f}]") + logger.info( + " Loaded %s profiles with %s time points each", + f"{len(profiles):,}", + profiles.shape[1], + ) + logger.info(" Data shape: %s", (profiles.shape[0], profiles.shape[1])) + logger.info(" Data range: [%.2f, %.2f]", profiles.min(), profiles.max()) return profiles, df @@ -92,7 +97,7 @@ def normalize_profiles( if method == "none": return X - logger.info(f"Normalizing profiles using {method} method...") + logger.info("Normalizing profiles using %s method...", method) if method == "zscore": # Per-profile z-score normalization @@ -110,7 +115,11 @@ def normalize_profiles( else: raise ValueError(f"Unknown normalization method: {method}") - logger.info(f" Normalized data range: [{X_norm.min():.2f}, {X_norm.max():.2f}]") + logger.info( + " Normalized data range: [%.2f, %.2f]", + X_norm.min(), + X_norm.max(), + ) return X_norm @@ -120,21 +129,41 @@ def evaluate_clustering( k_range: range, n_init: int = 10, random_state: int = 42, + silhouette_sample_size: int | None = 10_000, ) -> dict: """ Evaluate clustering for different values of k. + Uses inertia on the full dataset and silhouette score computed on a + subsample (to avoid O(n^2) cost when n is large). + Args: X: Profile array of shape (n_samples, n_timepoints) k_range: Range of k values to test n_init: Number of random initializations random_state: Random seed for reproducibility + silhouette_sample_size: Max number of samples for silhouette. + If None, uses full dataset (NOT recommended for very large n). Returns: Dictionary with k_values, inertia, and silhouette scores """ - logger.info(f"Evaluating clustering for k in {list(k_range)}...") - logger.info(f" Dataset size: {X.shape[0]:,} profiles") + n_samples = X.shape[0] + logger.info("Evaluating clustering for k in %s...", list(k_range)) + logger.info(" Dataset size: %s profiles", f"{n_samples:,}") + + if silhouette_sample_size is None: + logger.info(" Silhouette: using FULL dataset (may be very slow).") + elif n_samples > silhouette_sample_size: + logger.info( + " Silhouette: using a random subsample of %s profiles.", + f"{silhouette_sample_size:,}", + ) + else: + logger.info( + " Silhouette: using all %s profiles (<= sample size).", + f"{n_samples:,}", + ) results = { "k_values": [], @@ -143,7 +172,8 @@ def evaluate_clustering( } for k in k_range: - logger.info(f"\n Testing k={k}...") + logger.info("") + logger.info(" Testing k=%d...", k) model = KMeans( n_clusters=k, @@ -153,14 +183,23 @@ def evaluate_clustering( labels = model.fit_predict(X) - sil_score = silhouette_score(X, labels, metric="euclidean") + # Inertia on full data + inertia = float(model.inertia_) + + # Silhouette on sample (or full data if silhouette_sample_size is None) + sil_kwargs: dict = {"metric": "euclidean"} + if silhouette_sample_size is not None and n_samples > silhouette_sample_size: + sil_kwargs["sample_size"] = silhouette_sample_size + sil_kwargs["random_state"] = random_state + + sil_score = silhouette_score(X, labels, **sil_kwargs) results["k_values"].append(k) - results["inertia"].append(float(model.inertia_)) + results["inertia"].append(inertia) results["silhouette"].append(float(sil_score)) - logger.info(f" Inertia: {model.inertia_:,.2f}") - logger.info(f" Silhouette: {sil_score:.3f}") + logger.info(" Inertia: %s", f"{inertia:,.2f}") + logger.info(" Silhouette: %.3f", sil_score) return results @@ -178,10 +217,15 @@ def find_optimal_k(eval_results: dict) -> int: k_values = eval_results["k_values"] silhouettes = eval_results["silhouette"] - best_idx = np.argmax(silhouettes) - best_k = k_values[best_idx] + best_idx = int(np.argmax(silhouettes)) + best_k = int(k_values[best_idx]) - logger.info(f"\nOptimal k={best_k} (silhouette={silhouettes[best_idx]:.3f})") + logger.info("") + logger.info( + "Optimal k=%d (silhouette=%.3f)", + best_k, + silhouettes[best_idx], + ) return best_k @@ -204,7 +248,12 @@ def run_clustering( Returns: Tuple of (labels, centroids, inertia) """ - logger.info(f"\nRunning k-means with k={k} on {X.shape[0]:,} profiles...") + logger.info("") + logger.info( + "Running k-means with k=%d on %s profiles...", + k, + f"{X.shape[0]:,}", + ) model = KMeans( n_clusters=k, @@ -214,16 +263,22 @@ def run_clustering( labels = model.fit_predict(X) centroids = model.cluster_centers_ + inertia = float(model.inertia_) - logger.info(f" Inertia: {model.inertia_:,.2f}") + logger.info(" Inertia: %s", f"{inertia:,.2f}") # Log cluster distribution unique, counts = np.unique(labels, return_counts=True) for cluster, count in zip(unique, counts): pct = count / len(labels) * 100 - logger.info(f" Cluster {cluster}: {count:,} profiles ({pct:.1f}%)") + logger.info( + " Cluster %d: %s profiles (%.1f%%)", + cluster, + f"{count:,}", + pct, + ) - return labels, centroids, float(model.inertia_) + return labels, centroids, inertia def plot_centroids( @@ -272,7 +327,7 @@ def plot_centroids( plt.savefig(output_path, dpi=150) plt.close() - logger.info(f" Saved centroids plot: {output_path}") + logger.info(" Saved centroids plot: %s", output_path) def plot_cluster_samples( @@ -316,8 +371,12 @@ def plot_cluster_samples( cluster_mask = labels == i cluster_profiles = X[cluster_mask] - # Sample profiles n_available = len(cluster_profiles) + if n_available == 0: + ax.set_title(f"Cluster {i} (n=0)") + ax.grid(True, alpha=0.3) + continue + n_plot = min(n_samples, n_available) idx = rng.choice(n_available, size=n_plot, replace=False) @@ -342,7 +401,7 @@ def plot_cluster_samples( plt.savefig(output_path, dpi=150) plt.close() - logger.info(f" Saved cluster samples plot: {output_path}") + logger.info(" Saved cluster samples plot: %s", output_path) def plot_elbow_curve( @@ -379,7 +438,7 @@ def plot_elbow_curve( ax2.set_xticks(k_values) # Mark optimal k - best_idx = np.argmax(silhouette) + best_idx = int(np.argmax(silhouette)) ax2.axvline(x=k_values[best_idx], color="red", linestyle="--", alpha=0.7) ax2.scatter( [k_values[best_idx]], @@ -395,7 +454,7 @@ def plot_elbow_curve( plt.savefig(output_path, dpi=150) plt.close() - logger.info(f" Saved elbow curve: {output_path}") + logger.info(" Saved elbow curve: %s", output_path) def save_results( @@ -420,7 +479,7 @@ def save_results( output_dir.mkdir(parents=True, exist_ok=True) # Determine which ID columns are present (household vs ZIP+4 level) - id_cols = [] + id_cols: list[str] = [] if "account_identifier" in df.columns: id_cols.append("account_identifier") if "zip_code" in df.columns: @@ -431,10 +490,12 @@ def save_results( available_cols = [c for c in id_cols if c in df.columns] # Save cluster assignments - assignments = df.select(available_cols).with_columns(pl.Series("cluster", labels)) + assignments = df.select(available_cols).with_columns( + pl.Series("cluster", labels), + ) assignments_path = output_dir / "cluster_assignments.parquet" assignments.write_parquet(assignments_path) - logger.info(f" Saved assignments: {assignments_path}") + logger.info(" Saved assignments: %s", assignments_path) # Save centroids as parquet centroids_df = pl.DataFrame({ @@ -443,20 +504,20 @@ def save_results( }) centroids_path = output_dir / "cluster_centroids.parquet" centroids_df.write_parquet(centroids_path) - logger.info(f" Saved centroids: {centroids_path}") + logger.info(" Saved centroids: %s", centroids_path) # Save k evaluation results if eval_results: eval_path = output_dir / "k_evaluation.json" with open(eval_path, "w") as f: json.dump(eval_results, f, indent=2) - logger.info(f" Saved k evaluation: {eval_path}") + logger.info(" Saved k evaluation: %s", eval_path) # Save metadata metadata_path = output_dir / "clustering_metadata.json" with open(metadata_path, "w") as f: json.dump(metadata, f, indent=2) - logger.info(f" Saved metadata: {metadata_path}") + logger.info(" Saved metadata: %s", metadata_path) def main() -> None: @@ -465,11 +526,12 @@ def main() -> None: formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: - # Standard run with k evaluation + # Standard run with k evaluation (silhouette on sample) python euclidean_clustering.py \\ --input data/clustering/sampled_profiles.parquet \\ --output-dir data/clustering/results \\ - --k-range 3 6 --find-optimal-k --normalize + --k-range 3 6 --find-optimal-k --normalize \\ + --silhouette-sample-size 10000 # Fixed k (no evaluation) python euclidean_clustering.py \\ @@ -513,6 +575,15 @@ def main() -> None: action="store_true", help="Evaluate k range and use optimal k", ) + k_group.add_argument( + "--silhouette-sample-size", + type=int, + default=10_000, + help=( + "Max number of samples for silhouette evaluation " + "(default: 10000; use -1 to use full dataset, not recommended for large n)." + ), + ) # Clustering parameters parser.add_argument( @@ -526,7 +597,7 @@ def main() -> None: parser.add_argument( "--normalize", action="store_true", - help="Apply z-score normalization to profiles", + help="Apply normalization to profiles", ) parser.add_argument( "--normalize-method", @@ -561,16 +632,21 @@ def main() -> None: if args.k is not None: # Fixed k k = args.k - logger.info(f"\nUsing fixed k={k}") + logger.info("") + logger.info("Using fixed k=%d", k) elif args.find_optimal_k: # Evaluate k range k_range = range(args.k_range[0], args.k_range[1] + 1) + silhouette_sample_size: int | None + silhouette_sample_size = None if args.silhouette_sample_size < 0 else args.silhouette_sample_size + eval_results = evaluate_clustering( X, k_range=k_range, n_init=args.n_init, random_state=args.random_state, + silhouette_sample_size=silhouette_sample_size, ) # Save elbow curve @@ -581,7 +657,8 @@ def main() -> None: else: # Default to min of k_range k = args.k_range[0] - logger.info(f"\nUsing default k={k}") + logger.info("") + logger.info("Using default k=%d", k) # Run final clustering labels, centroids, inertia = run_clustering( @@ -592,25 +669,28 @@ def main() -> None: ) # Create visualizations - logger.info("\nGenerating visualizations...") + logger.info("") + logger.info("Generating visualizations...") args.output_dir.mkdir(parents=True, exist_ok=True) plot_centroids(centroids, args.output_dir / "cluster_centroids.png") plot_cluster_samples(X, labels, centroids, args.output_dir / "cluster_samples.png") # Save results - logger.info("\nSaving results...") + logger.info("") + logger.info("Saving results...") metadata = { - "k": k, - "n_profiles": len(X), - "n_timepoints": X.shape[1], - "normalized": args.normalize, + "k": int(k), + "n_profiles": int(X.shape[0]), + "n_timepoints": int(X.shape[1]), + "normalized": bool(args.normalize), "normalize_method": args.normalize_method if args.normalize else None, - "n_init": args.n_init, - "random_state": args.random_state, - "inertia": inertia, + "n_init": int(args.n_init), + "random_state": int(args.random_state), + "inertia": float(inertia), "distance_metric": "euclidean", + "silhouette_sample_size": (None if args.silhouette_sample_size < 0 else int(args.silhouette_sample_size)), } save_results(df, labels, centroids, eval_results, metadata, args.output_dir) @@ -620,7 +700,7 @@ def main() -> None: print("CLUSTERING COMPLETE") print("=" * 70) print(f"\nResults saved to: {args.output_dir}") - print(f" • {len(X):,} profiles clustered into {k} groups") + print(f" • {X.shape[0]:,} profiles clustered into {k} groups") print(f" • Inertia: {inertia:,.2f}") if eval_results: best_sil = max(eval_results["silhouette"]) diff --git a/analysis/clustering/prepare_clustering_data_households.py b/analysis/clustering/prepare_clustering_data_households.py index 9b4d6e2..660ff31 100644 --- a/analysis/clustering/prepare_clustering_data_households.py +++ b/analysis/clustering/prepare_clustering_data_households.py @@ -6,19 +6,22 @@ Transforms interval-level energy data into daily load profiles at the HOUSEHOLD (account) level for k-means clustering. -Pipeline: - 1. Validate schema (no data scan) - 2. Scan for metadata and sample households + dates - 3. Create daily 48-point load profiles per household - 4. Output profiles ready for clustering +What this does: + 1. Validate schema (no full data scan) + 2. Build/load manifests for accounts and dates (streaming, memory-safe) + 3. Sample households + dates from manifests + 4. Create daily 48-point load profiles per household + 5. Output profiles ready for clustering Design notes: - - Minimizes full-file scans (summary + unique accounts + unique dates only) + - Uses MANIFESTS to avoid OOM on unique().collect() for large files + - Manifests are built once via sink_parquet (streaming) and cached - Standard mode: single filtered collect() for selected households/dates - * good for up to ~50k households sampled - - Streaming mode: sink_parquet() pre-filter, then aggregate - * safer for 100k+ households + * good for up to ~5k-10k households sampled (depending on memory) + - Chunked streaming mode: fully streaming pipeline with per-chunk sinks + * required for 10k+ households on constrained hardware - Profiles are 48 half-hourly kWh values in chronological order (00:30-24:00) + - Incomplete days (not 48 intervals) are dropped as “missing/irregular data” Output files: - sampled_profiles.parquet: @@ -26,31 +29,43 @@ - household_zip4_map.parquet: Unique account_identifier → zip_code mapping for later joins +Manifest files (auto-generated alongside input, via smart_meter_analysis.manifests): + - {input_stem}_accounts.parquet: unique (account_identifier, zip_code) pairs + - {input_stem}_dates.parquet: unique (date, is_weekend, weekday) tuples + Usage: - # Standard (5000 households, 20 days) + # Standard (5,000 households, 20 days) python prepare_clustering_data_households.py \ --input data/processed/comed_202308.parquet \ --output-dir data/clustering \ --sample-households 5000 \ --sample-days 20 - # Large dataset with streaming + # Large dataset with chunked streaming (20,000 households) python prepare_clustering_data_households.py \ --input data/processed/comed_202308.parquet \ --output-dir data/clustering \ - --sample-households 100000 \ - --streaming + --sample-households 20000 \ + --sample-days 31 \ + --streaming \ + --chunk-size 2000 """ from __future__ import annotations import argparse +import gc import logging from pathlib import Path from typing import Any, Literal import polars as pl +from smart_meter_analysis.manifests import ( + ensure_account_manifest, + ensure_date_manifest, +) + logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", @@ -62,6 +77,25 @@ REQUIRED_TIME_COLS = ["date", "hour", "is_weekend", "weekday"] +# ============================================================================= +# MEMORY INSTRUMENTATION +# ============================================================================= + + +def log_memory(label: str) -> None: + """Log current RSS memory usage (Linux only).""" + try: + with open("/proc/self/status") as f: + for line in f: + if line.startswith("VmRSS:"): + mem_mb = int(line.split()[1]) / 1024 + logger.info("[MEMORY] %s: %.0f MB", label, mem_mb) + break + except Exception as exc: + # Best-effort only; ignore on non-Linux or restricted environments + logger.debug("Skipping memory log for %s: %s", label, exc) + + # ============================================================================= # DATA INSPECTION & SAMPLING # ============================================================================= @@ -108,13 +142,12 @@ def get_metadata_and_samples( seed: int = 42, ) -> dict[str, Any]: """ - Get summary statistics and sample households + dates using minimal scans. + Get summary statistics and sample households + dates using MANIFESTS. - This function performs: - - Summary stats (row counts, unique counts, date range) - - Unique households - - Unique dates with weekend flags - - Sampling of households and dates + This function: + - Computes summary stats via streaming (safe for large files) + - Builds/loads account and date manifests (streaming sink, memory-safe) + - Samples households and dates from the small manifest files Args: input_path: Path to input parquet file. @@ -130,44 +163,55 @@ def get_metadata_and_samples( - dates: list[date] of sampled dates """ logger.info("Scanning input data for metadata and sampling...") + log_memory("start of get_metadata_and_samples") lf = pl.scan_parquet(input_path) - # Summary stats + # Summary stats (streaming-safe: single row output) + logger.info(" Computing summary statistics...") summary_df = lf.select([ pl.len().alias("n_rows"), pl.col("zip_code").n_unique().alias("n_zip_codes"), pl.col("account_identifier").n_unique().alias("n_accounts"), pl.col("date").min().alias("min_date"), pl.col("date").max().alias("max_date"), - ]).collect() + ]).collect(engine="streaming") summary = summary_df.to_dicts()[0] # Early checks if summary["n_rows"] == 0: raise ValueError(f"Input file {input_path} contains no rows.") - logger.info(f" {summary['n_rows']:,} rows") - logger.info(f" {summary['n_accounts']:,} households") - logger.info(f" {summary['n_zip_codes']:,} ZIP+4 codes") - logger.info(f" Date range: {summary['min_date']} to {summary['max_date']}") + logger.info(" %s rows", f"{summary['n_rows']:,}") + logger.info(" %s households", f"{summary['n_accounts']:,}") + logger.info(" %s ZIP+4 codes", f"{summary['n_zip_codes']:,}") + logger.info(" Date range: %s to %s", summary["min_date"], summary["max_date"]) - # Unique households - accounts_df = pl.scan_parquet(input_path).select("account_identifier").unique().collect() - if accounts_df.height == 0: - raise ValueError("No account_identifier values found in input data.") + # ========================================================================== + # KEY CHANGE: Use manifests instead of unique().collect() + # ========================================================================== + + logger.info(" Loading manifests...") + account_manifest = ensure_account_manifest(input_path) + date_manifest = ensure_date_manifest(input_path) + + # Read from small manifest files (fits easily in memory) + accounts_df = pl.read_parquet(account_manifest) + dates_df = pl.read_parquet(date_manifest) + + log_memory("after loading manifests") - # Unique dates with weekend flag - dates_df = pl.scan_parquet(input_path).select(["date", "is_weekend"]).unique().collect() + if accounts_df.height == 0: + raise ValueError("No account_identifier values found in manifest.") if dates_df.height == 0: - raise ValueError("No dates found in input data.") + raise ValueError("No dates found in manifest.") # Sample households if sample_households is not None and sample_households < len(accounts_df): accounts_df = accounts_df.sample(n=sample_households, shuffle=True, seed=seed) - logger.info(f" Sampled {len(accounts_df):,} households") + logger.info(" Sampled %s households", f"{len(accounts_df):,}") else: - logger.info(f" Using all {len(accounts_df):,} households") + logger.info(" Using all %s households", f"{len(accounts_df):,}") accounts = accounts_df["account_identifier"].to_list() @@ -176,11 +220,8 @@ def get_metadata_and_samples( weekday_df = dates_df.filter(~pl.col("is_weekend")) weekend_df = dates_df.filter(pl.col("is_weekend")) - if weekday_df.height == 0: - logger.warning(" No weekdays found; falling back to random day sampling.") - day_strategy = "random" - elif weekend_df.height == 0: - logger.warning(" No weekends found; falling back to random day sampling.") + if weekday_df.height == 0 or weekend_df.height == 0: + logger.warning(" Missing weekdays or weekends; falling back to random day sampling.") day_strategy = "random" if day_strategy == "stratified": @@ -198,15 +239,21 @@ def get_metadata_and_samples( ) dates = sampled_weekdays + sampled_weekends - logger.info(f" Sampled {len(sampled_weekdays)} weekdays + {len(sampled_weekends)} weekend days (stratified)") + logger.info( + " Sampled %d weekdays + %d weekend days (stratified)", + len(sampled_weekdays), + len(sampled_weekends), + ) else: n_sample = min(sample_days, len(dates_df)) dates = dates_df.sample(n=n_sample, shuffle=True, seed=seed)["date"].to_list() - logger.info(f" Sampled {len(dates)} days (random)") + logger.info(" Sampled %d days (random)", len(dates)) if not dates: raise ValueError("No dates were sampled; check input data and sampling settings.") + log_memory("end of get_metadata_and_samples") + return { "summary": summary, "accounts": accounts, @@ -232,7 +279,7 @@ def create_household_profiles( (00:30 to 24:00). This uses a single filtered collect() over the full file, which is efficient - when sampling a subset of households and dates. + when sampling a moderate subset of households and dates. Args: input_path: Path to input parquet file. @@ -248,7 +295,12 @@ def create_household_profiles( - is_weekend - weekday """ - logger.info(f"Creating profiles for {len(accounts):,} households X {len(dates)} days...") + logger.info( + "Creating profiles for %s households x %d days...", + f"{len(accounts):,}", + len(dates), + ) + log_memory("start of create_household_profiles") if not accounts: raise ValueError("No accounts provided for profile creation.") @@ -267,7 +319,8 @@ def create_household_profiles( logger.warning(" No data found for selected households/dates") return pl.DataFrame() - logger.info(f" Loaded {len(df):,} interval records") + logger.info(" Loaded %s interval records", f"{len(df):,}") + log_memory("after loading filtered intervals") profiles_df = df.group_by(["account_identifier", "zip_code", "date"]).agg([ pl.col("kwh").alias("profile"), @@ -281,92 +334,163 @@ def create_household_profiles( n_dropped = n_before - len(profiles_df) if n_dropped > 0: - logger.info(f" Dropped {n_dropped:,} incomplete profiles (DST days, missing data)") + logger.info( + " Dropped %s incomplete profiles (missing or irregular data)", + f"{n_dropped:,}", + ) - logger.info(f" Created {len(profiles_df):,} complete profiles") + logger.info(" Created %s complete profiles", f"{len(profiles_df):,}") + log_memory("end of create_household_profiles") return profiles_df.drop("num_intervals") -def create_household_profiles_streaming( +def _create_profiles_for_chunk_streaming( + input_path: Path, + accounts_chunk: list[str], + dates: list[Any], + chunk_idx: int, + total_chunks: int, + tmp_dir: Path, +) -> Path: + """ + Create profiles for a single chunk of households and write directly to parquet. + + Uses within-group sort (struct.sort_by) to preserve chronological order + without a global sort, which keeps the lazy plan streaming-compatible. + + Returns: + Path to the chunk parquet file. + """ + logger.info( + " Chunk %d/%d: %s households...", + chunk_idx + 1, + total_chunks, + f"{len(accounts_chunk):,}", + ) + log_memory(f"chunk {chunk_idx + 1} start") + + chunk_path = tmp_dir / f"sampled_profiles_chunk_{chunk_idx:03d}.parquet" + + ( + pl.scan_parquet(input_path) + .filter(pl.col("account_identifier").is_in(accounts_chunk) & pl.col("date").is_in(dates)) + .group_by(["account_identifier", "zip_code", "date"]) + .agg([ + # Sort by datetime within group, then extract kwh values + pl.struct(["datetime", "kwh"]).sort_by("datetime").struct.field("kwh").alias("profile"), + pl.col("is_weekend").first(), + pl.col("weekday").first(), + pl.len().alias("num_intervals"), + ]) + .filter(pl.col("num_intervals") == 48) + .drop("num_intervals") + .sink_parquet(chunk_path) + ) + + log_memory(f"chunk {chunk_idx + 1} done") + logger.info(" Wrote chunk parquet: %s", chunk_path) + + return chunk_path + + +def create_household_profiles_chunked_streaming( input_path: Path, accounts: list[str], dates: list[Any], output_path: Path, + chunk_size: int = 5000, ) -> int: """ - Create daily load profiles using a streaming-friendly two-pass approach. + Create daily load profiles using chunked streaming. - Pass 1: - Filter to selected households/dates and stream to a temp parquet file. - Pass 2: - Sort by (account_identifier, datetime), aggregate to daily profiles - with list-of-48 kWh values, and write final output. + - Splits households into chunks. + - For each chunk, runs a streaming filter → group_by → aggregate → sink. + - Concatenates all chunk files using a streaming concat. + - Deletes temporary chunk files. - This avoids loading the entire original parquet into memory, but the - aggregated profiles are still collected in-memory before final write. + This avoids ever materializing profiles for all households in memory. Args: - input_path: Path to input parquet file. + input_path: Path to interval-level parquet file. accounts: List of account_identifier values to include. dates: List of dates to include. - output_path: Path to write final profiles parquet. + output_path: Final output parquet path for all profiles. + chunk_size: Number of households per chunk. Returns: Number of complete daily profiles created. """ - logger.info(f"Creating profiles (streaming) for {len(accounts):,} households X {len(dates)} days...") - if not accounts: - raise ValueError("No accounts provided for streaming profile creation.") + raise ValueError( + "No accounts provided for chunked streaming profile creation.", + ) if not dates: - raise ValueError("No dates provided for streaming profile creation.") + raise ValueError("No dates provided for chunked streaming profile creation.") - temp_path = output_path.parent / "_temp_filtered.parquet" + n_accounts = len(accounts) + n_chunks = (n_accounts + chunk_size - 1) // chunk_size - try: - # Pass 1: Stream filtered data to temp file - logger.info(" Pass 1: Streaming filtered data to temp parquet...") - ( - pl.scan_parquet(input_path) - .filter(pl.col("account_identifier").is_in(accounts) & pl.col("date").is_in(dates)) - .sink_parquet(temp_path) - ) + logger.info( + "Creating profiles in %d chunks of up to %s households each (total: %s households x %d days)...", + n_chunks, + f"{chunk_size:,}", + f"{n_accounts:,}", + len(dates), + ) + log_memory("before chunked streaming") + + output_path.parent.mkdir(parents=True, exist_ok=True) + tmp_dir = output_path.parent + + chunk_paths: list[Path] = [] + + for i in range(n_chunks): + start_idx = i * chunk_size + end_idx = min((i + 1) * chunk_size, n_accounts) + accounts_chunk = accounts[start_idx:end_idx] - if not temp_path.exists() or temp_path.stat().st_size == 0: - logger.warning(" No data found for selected households/dates in streaming mode") - return 0 - - # Pass 2: Sort, aggregate, and write final output - logger.info(" Pass 2: Sorting and aggregating to daily profiles...") - - profiles_df = ( - pl.scan_parquet(temp_path) - .sort(["account_identifier", "datetime"]) - .collect() - .group_by(["account_identifier", "zip_code", "date"]) - .agg([ - pl.col("kwh").alias("profile"), - pl.col("is_weekend").first(), - pl.col("weekday").first(), - pl.len().alias("num_intervals"), - ]) - .filter(pl.col("num_intervals") == 48) - .drop("num_intervals") + chunk_path = _create_profiles_for_chunk_streaming( + input_path=input_path, + accounts_chunk=accounts_chunk, + dates=dates, + chunk_idx=i, + total_chunks=n_chunks, + tmp_dir=tmp_dir, ) - n_profiles = len(profiles_df) - logger.info(f" Created {n_profiles:,} complete profiles") + # Only keep non-empty chunk files + if chunk_path.exists() and chunk_path.stat().st_size > 0: + chunk_paths.append(chunk_path) + + gc.collect() + + if not chunk_paths: + logger.warning("No profiles created in chunked streaming mode!") + return 0 - profiles_df.write_parquet(output_path) - logger.info(f" Saved profiles (streaming) to {output_path}") + # Stream-concatenate all chunk parquet files + logger.info("Combining %d chunk files into %s", len(chunk_paths), output_path) + log_memory("before streaming concat") - return n_profiles + combined_lf = pl.concat([pl.scan_parquet(p) for p in chunk_paths]) + combined_lf.sink_parquet(output_path) - finally: - # Clean up temp file - if temp_path.exists(): - temp_path.unlink(missing_ok=True) + log_memory("after streaming concat") + + # Count rows in final output + n_profiles = pl.scan_parquet(output_path).select(pl.len()).collect()[0, 0] + logger.info(" Created %s complete profiles (chunked streaming)", f"{n_profiles:,}") + logger.info(" Saved to %s", output_path) + + # Clean up temporary chunk files + for p in chunk_paths: + try: + p.unlink(missing_ok=True) + except OSError as exc: + logger.warning("Failed to delete temp chunk file %s: %s", p, exc) + + return int(n_profiles) # ============================================================================= @@ -381,6 +505,7 @@ def prepare_clustering_data( sample_days: int = 20, day_strategy: Literal["stratified", "random"] = "stratified", streaming: bool = False, + chunk_size: int = 5000, seed: int = 42, ) -> dict[str, Any]: """ @@ -392,7 +517,8 @@ def prepare_clustering_data( sample_households: Number of households to sample (None = all). sample_days: Number of days to sample. day_strategy: 'stratified' (70% weekdays) or 'random'. - streaming: If True, use streaming-friendly mode for large samples. + streaming: If True, use chunked streaming mode for large samples. + chunk_size: Households per chunk when streaming is enabled. seed: Random seed for reproducibility. Returns: @@ -405,15 +531,16 @@ def prepare_clustering_data( logger.info("=" * 70) logger.info("PREPARING HOUSEHOLD-LEVEL CLUSTERING DATA") if streaming: - logger.info("(STREAMING MODE ENABLED)") + logger.info("(STREAMING MODE ENABLED, chunk_size=%d)", chunk_size) logger.info("=" * 70) + log_memory("start of prepare_clustering_data") # 1. Schema validation (cheap, no data load) validation = validate_schema(input_path) if not validation["valid"]: raise ValueError(f"Input validation failed: {validation['errors']}") - # 2. Metadata + sampling + # 2. Metadata + sampling (uses manifests for memory safety) metadata = get_metadata_and_samples( input_path=input_path, sample_households=sample_households, @@ -429,17 +556,20 @@ def prepare_clustering_data( output_dir.mkdir(parents=True, exist_ok=True) profiles_path = output_dir / "sampled_profiles.parquet" - # 4. Create profiles (in-memory or streaming) + # 4. Create profiles (in-memory or chunked streaming) if streaming: - n_profiles = create_household_profiles_streaming( + n_profiles = create_household_profiles_chunked_streaming( input_path=input_path, accounts=accounts, dates=dates, output_path=profiles_path, + chunk_size=chunk_size, ) if n_profiles == 0: - raise ValueError("No profiles created in streaming mode - check input data.") + raise ValueError( + "No profiles created in chunked streaming mode - check input data and sampling settings.", + ) profiles_df = pl.read_parquet(profiles_path) else: @@ -450,25 +580,29 @@ def prepare_clustering_data( ) if profiles_df.is_empty(): - raise ValueError("No profiles created - check input data and sampling settings.") - - ( - profiles_df.select([ - "account_identifier", - "zip_code", - "date", - "profile", - "is_weekend", - "weekday", - ]).write_parquet(profiles_path) - ) - logger.info(f" Saved profiles: {profiles_path}") + raise ValueError( + "No profiles created - check input data and sampling settings.", + ) + + profiles_df.select([ + "account_identifier", + "zip_code", + "date", + "profile", + "is_weekend", + "weekday", + ]).write_parquet(profiles_path) + logger.info(" Saved profiles: %s", profiles_path) # 5. Save household → ZIP+4 mapping household_map = profiles_df.select(["account_identifier", "zip_code"]).unique() map_path = output_dir / "household_zip4_map.parquet" household_map.write_parquet(map_path) - logger.info(f" Saved household-ZIP+4 map: {map_path} ({len(household_map):,} households)") + logger.info( + " Saved household-ZIP+4 map: %s (%s households)", + map_path, + f"{len(household_map):,}", + ) # 6. Stats stats = { @@ -482,11 +616,12 @@ def prepare_clustering_data( logger.info("=" * 70) logger.info("CLUSTERING DATA READY") logger.info("=" * 70) - logger.info(f" Profiles: {stats['n_profiles']:,}") - logger.info(f" Households: {stats['n_households']:,}") - logger.info(f" ZIP+4s represented: {stats['n_zip4s']:,}") - logger.info(f" Days: {stats['n_dates']}") - logger.info(f" Output: {output_dir}") + logger.info(" Profiles: %s", f"{stats['n_profiles']:,}") + logger.info(" Households: %s", f"{stats['n_households']:,}") + logger.info(" ZIP+4s represented: %s", f"{stats['n_zip4s']:,}") + logger.info(" Days: %d", stats["n_dates"]) + logger.info(" Output: %s", output_dir) + log_memory("end of prepare_clustering_data") return stats @@ -497,31 +632,33 @@ def main() -> int: formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: - # Standard run (5000 households, 20 days) - python prepare_clustering_data_households.py \ - --input data/processed/comed_202308.parquet \ - --output-dir data/clustering \ - --sample-households 5000 \ + # Standard run (5,000 households, 20 days) + python prepare_clustering_data_households.py \\ + --input data/processed/comed_202308.parquet \\ + --output-dir data/clustering \\ + --sample-households 5000 \\ --sample-days 20 - # Large dataset with streaming - python prepare_clustering_data_households.py \ - --input data/processed/comed_202308.parquet \ - --output-dir data/clustering \ - --sample-households 100000 \ - --streaming + # Large dataset with chunked streaming + python prepare_clustering_data_households.py \\ + --input data/processed/comed_202308.parquet \\ + --output-dir data/clustering \\ + --sample-households 20000 \\ + --sample-days 31 \\ + --streaming \\ + --chunk-size 2000 # All households, fewer days - python prepare_clustering_data_households.py \ - --input data/processed/comed_202308.parquet \ - --output-dir data/clustering \ + python prepare_clustering_data_households.py \\ + --input data/processed/comed_202308.parquet \\ + --output-dir data/clustering \\ --sample-days 10 # Quick test - python prepare_clustering_data_households.py \ - --input data/processed/comed_202308.parquet \ - --output-dir data/clustering \ - --sample-households 500 \ + python prepare_clustering_data_households.py \\ + --input data/processed/comed_202308.parquet \\ + --output-dir data/clustering \\ + --sample-households 500 \\ --sample-days 5 """, ) @@ -554,7 +691,7 @@ def main() -> int: "--day-strategy", choices=["stratified", "random"], default="stratified", - help="Day sampling: stratified (70%% weekday) or random.", + help="Day sampling: stratified (70% weekday) or random.", ) parser.add_argument( "--seed", @@ -565,13 +702,19 @@ def main() -> int: parser.add_argument( "--streaming", action="store_true", - help="Use streaming-friendly mode for large household samples.", + help="Use chunked streaming mode for large household samples (10k+).", + ) + parser.add_argument( + "--chunk-size", + type=int, + default=5000, + help="Households per chunk when --streaming is enabled (default: 5000).", ) args = parser.parse_args() if not args.input.exists(): - logger.error(f"Input file not found: {args.input}") + logger.error("Input file not found: %s", args.input) return 1 try: @@ -582,11 +725,12 @@ def main() -> int: sample_days=args.sample_days, day_strategy=args.day_strategy, streaming=args.streaming, + chunk_size=args.chunk_size, seed=args.seed, ) return 0 - except Exception as e: - logger.error(f"Failed: {e}") + except Exception as exc: + logger.error("Failed: %s", exc) # Re-raise so stack traces are visible in logs when run via a pipeline raise diff --git a/analysis/clustering/stage2_blockgroup_regression.py b/analysis/clustering/stage2_blockgroup_regression.py index 6d749cb..ec64e7b 100644 --- a/analysis/clustering/stage2_blockgroup_regression.py +++ b/analysis/clustering/stage2_blockgroup_regression.py @@ -43,6 +43,10 @@ import numpy as np import polars as pl +import statsmodels.api as sm +from sklearn.preprocessing import StandardScaler + +from smart_meter_analysis.census import fetch_census_data logger = logging.getLogger(__name__) logging.basicConfig( @@ -56,8 +60,6 @@ "Median_Age", "Urban_Percent", "Total_Households", - "Owner_Occupied", - "Renter_Occupied", ] @@ -264,12 +266,6 @@ def fetch_or_load_census( logger.info("Fetching Census data from API (state=%s, year=%s)...", state_fips, acs_year) - try: - from smart_meter_analysis.census import fetch_census_data - except ImportError: - logger.error("Could not import fetch_census_data from smart_meter_analysis.census.") - raise - census_df = fetch_census_data(state_fips=state_fips, acs_year=acs_year) cache_path.parent.mkdir(parents=True, exist_ok=True) @@ -368,19 +364,33 @@ def run_multinomial_regression( predictors: list[str], outcome: str = "cluster", weight_col: str = "n_households", + standardize: bool = False, ) -> dict[str, object]: - """Run multinomial logistic regression with statsmodels.""" + """Run multinomial logistic regression with statsmodels. + + Parameters + ---------- + reg_df : pl.DataFrame + Long-form data, one row per (block_group_geoid, cluster) with + columns including predictors, outcome, and weights. + predictors : list[str] + Names of predictor columns. + outcome : str, default "cluster" + Name of the outcome column (cluster index). + weight_col : str, default "n_households" + Column providing frequency weights (households per BG x cluster). + standardize : bool, default False + If True, standardize predictors before regression. If False, + use raw units (easier to interpret coefficients and ORs). + """ logger.info("Running multinomial logistic regression...") - import statsmodels.api as sm - from sklearn.preprocessing import StandardScaler - # Extract arrays X = reg_df.select(predictors).to_numpy().astype(np.float64) y = reg_df.get_column(outcome).to_numpy() weights = reg_df.get_column(weight_col).to_numpy().astype(np.float64) - # Drop rows with NaN + # Drop rows with NaN in predictors nan_mask = np.isnan(X).any(axis=1) if nan_mask.sum() > 0: logger.warning(" Dropping %s rows with NaN predictors", f"{nan_mask.sum():,}") @@ -390,19 +400,31 @@ def run_multinomial_regression( raise ValueError("No observations remaining after dropping NaN rows.") # Count block groups with complete predictor data (for reporting) + # (This is approximate, but good enough for a summary line.) n_block_groups = reg_df.filter(~pl.any_horizontal(pl.col(predictors).is_null()))["block_group_geoid"].n_unique() - # Standardize and add intercept - scaler = StandardScaler() - X_scaled = scaler.fit_transform(X) - X_with_const = sm.add_constant(X_scaled) + # Standardize or leave in raw units + if standardize: + logger.info(" Standardizing predictors (mean 0, sd 1)...") + scaler = StandardScaler() + X_transformed = scaler.fit_transform(X) + else: + logger.info(" Using raw predictor units (no standardization).") + X_transformed = X + + # Add intercept + X_with_const = sm.add_constant(X_transformed) - # Expand rows by weights for proper frequency weighting + # Expand rows by integer weights for frequency weighting weight_ints = np.maximum(np.round(weights).astype(int), 1) X_expanded = np.repeat(X_with_const, weight_ints, axis=0) y_expanded = np.repeat(y, weight_ints) - logger.info(" Training on %s expanded rows (%s block groups)", f"{len(X_expanded):,}", n_block_groups) + logger.info( + " Training on %s expanded rows (%s block groups)", + f"{len(X_expanded):,}", + n_block_groups, + ) model = sm.MNLogit(y_expanded, X_expanded) result = model.fit(method="newton", maxiter=100, disp=False) diff --git a/docs/index.html b/docs/index.html index 9d2f266..e7a3cf4 100644 --- a/docs/index.html +++ b/docs/index.html @@ -34,7 +34,7 @@ - + diff --git a/scripts/run_comed_pipeline.py b/scripts/run_comed_pipeline.py index 255f5c7..f802b7e 100644 --- a/scripts/run_comed_pipeline.py +++ b/scripts/run_comed_pipeline.py @@ -3,7 +3,8 @@ ComEd Smart Meter Analysis Pipeline Main entry point for the ComEd smart meter clustering analysis. This script -handles the complete workflow from raw S3 data to clustered load profiles. +handles the complete workflow from raw S3 data to clustered household-day +load profiles and (optionally) Stage 2 block-group regression. ================================================================================ PIPELINE OVERVIEW @@ -12,67 +13,83 @@ Stage 1: Usage Pattern Clustering (this script) 1. Download - Fetch CSV files from S3 2. Process - Transform wide→long, add time features, write parquet - 3. Prepare - Create daily load profiles per HOUSEHOLD (not ZIP+4) - 4. Cluster - K-means (Euclidean) to identify usage patterns + 3. Prepare - Create daily load profiles per HOUSEHOLD-DAY + (one 48-point profile per (household, date)) + 4. Cluster - K-means (Euclidean) on household-day profiles 5. Validate - Check data quality at each step Stage 2: Block-Group Demographic Regression (stage2_blockgroup_regression.py) - - Aggregate households to block-group X cluster counts + - Aggregate household-day observations to (block_group x cluster) counts - Join with Census demographics at block-group level - Run multinomial logistic regression (statsmodels for proper inference) - - Identify demographic predictors of cluster composition + - Identify demographic predictors of the mix of clusters across household-days - Unit of analysis: ONE ROW PER (block_group, cluster) + Unit of analysis: ONE ROW PER (block_group, cluster), with counts over + household-day observations. ================================================================================ USAGE MODES ================================================================================ -FULL PIPELINE (download from S3 and run everything): - python run_comed_pipeline.py --from-s3 --num-files 1000 - -VALIDATE ONLY (check existing files): - python run_comed_pipeline.py --validate-only - -PROCESS ONLY (no clustering, useful for testing): - python run_comed_pipeline.py --from-s3 --num-files 100 --skip-clustering +Typical workflow from the project root (script lives in scripts/): + +FULL PIPELINE (download from S3, process, cluster, validate, run Stage 2): + python scripts/run_comed_pipeline.py \ + --from-s3 \ + --year-month 202308 \ + --num-files 10000 \ + --sample-households 20000 \ + --sample-days 31 \ + --k-range 3 6 \ + --run-stage2 + +QUICK LOCAL TEST (fewer files, fewer households/days): + python scripts/run_comed_pipeline.py \ + --from-s3 \ + --year-month 202308 \ + --num-files 100 \ + --sample-households 2000 \ + --sample-days 10 \ + --k-range 3 4 + +VALIDATE ONLY (check existing files for a given run): + python scripts/run_comed_pipeline.py \ + --validate-only \ + --run-name 202308_10000 SPECIFIC STAGE VALIDATION: - python run_comed_pipeline.py --validate-only --stage processed - python run_comed_pipeline.py --validate-only --stage clustering + python scripts/run_comed_pipeline.py \ + --validate-only \ + --run-name 202308_10000 \ + --stage processed + + python scripts/run_comed_pipeline.py \ + --validate-only \ + --run-name 202308_10000 \ + --stage clustering ================================================================================ OUTPUT STRUCTURE ================================================================================ data/validation_runs/{run_name}/ -├── samples/ # Raw CSV files from S3 +├── samples/ # Raw CSV files from S3 ├── processed/ -│ └── comed_{year_month}.parquet # Interval-level data (long format) +│ └── comed_{year_month}.parquet # Interval-level data (long format) ├── clustering/ -│ ├── sampled_profiles.parquet # Daily load profiles for clustering -│ ├── zip4_demographics.parquet # ZIP+4 list (demographics added in Stage 2) +│ ├── sampled_profiles.parquet # Household-day profiles for clustering +│ ├── household_zip4_map.parquet # Map of households to ZIP+4s │ └── results/ │ ├── cluster_assignments.parquet │ ├── cluster_centroids.parquet -│ ├── clustering_metrics.json -│ └── centroid_plot.png - -================================================================================ -EXAMPLES -================================================================================ +│ ├── k_evaluation.json +│ ├── clustering_metadata.json +│ ├── elbow_curve.png +│ ├── cluster_centroids.png +│ ├── cluster_samples.png +│ └── stage2_blockgroups/ # (optional) Stage 2 outputs when run +│ ├── ... block-group cluster counts, regression results, etc. -# Quick test with 100 files -python run_comed_pipeline.py --from-s3 --num-files 100 --sample-households 500 --sample-days 10 - -# Standard analysis (1000 files, ~30min clustering) -python run_comed_pipeline.py --from-s3 --num-files 1000 - -# Large scale (will take longer) -python run_comed_pipeline.py --from-s3 --num-files 10000 --sample-households 10000 - -# Just validate existing results -python run_comed_pipeline.py --validate-only --run-name 202308_1000 """ from __future__ import annotations @@ -97,7 +114,6 @@ DEFAULT_PATHS = { "processed": Path("data/processed/comed_202308.parquet"), - "enriched": Path("data/enriched_test/enriched.parquet"), "clustering_dir": Path("data/clustering"), "crosswalk": Path("data/reference/2023_comed_zip4_census_crosswalk.txt"), } @@ -108,12 +124,16 @@ } DEFAULT_CLUSTERING_CONFIG = { - "sample_days": 20, - "sample_households": 5000, # Number of households to sample (None = all) - "k_min": 3, - "k_max": 5, + # Household/day sampling for clustering + "sample_days": 31, + "sample_households": 20_000, # None = all; 20k is the tested high-volume default "day_strategy": "stratified", - "n_init": 10, # Number of k-means initializations + # K-means hyperparameters + "k_min": 3, + "k_max": 6, + "n_init": 10, + # Profile construction (streaming is always on) + "chunk_size": 500, # households per chunk when building profiles } @@ -127,11 +147,12 @@ class ComedPipeline: Orchestrates the ComEd smart meter analysis pipeline. This class manages the complete workflow from raw S3 data to clustered - load profiles, with validation at each step. + household-day load profiles, with validation at each step and optional + Stage 2 regression. Attributes: base_dir: Project root directory - run_name: Identifier for this pipeline run (e.g., "202308_1000") + run_name: Identifier for this pipeline run (e.g., "202308_10000") run_dir: Output directory for this run paths: Dictionary of file paths for this run """ @@ -173,7 +194,7 @@ def setup_directories(self) -> None: for subdir in ["samples", "processed", "clustering/results"]: (self.run_dir / subdir).mkdir(parents=True, exist_ok=True) - logger.info(f"Created output directory: {self.run_dir}") + logger.info("Created output directory: %s", self.run_dir) def download_from_s3( self, @@ -186,13 +207,13 @@ def download_from_s3( Download CSV files from S3 for processing. Args: - year_month: Target month in YYYYMM format (e.g., '202308') + year_month: Target month in YYYYMM format (e.g., "202308") num_files: Number of CSV files to download bucket: S3 bucket name prefix: S3 key prefix for ComEd data Returns: - True if download successful, False otherwise + True if download successful, False otherwise. """ try: import boto3 @@ -200,7 +221,7 @@ def download_from_s3( logger.error("boto3 not installed. Run: pip install boto3") return False - logger.info(f"Connecting to S3: s3://{bucket}/{prefix}{year_month}/") + logger.info("Connecting to S3: s3://%s/%s%s/", bucket, prefix, year_month) try: s3 = boto3.client("s3") @@ -208,7 +229,7 @@ def download_from_s3( # List files paginator = s3.get_paginator("list_objects_v2") - csv_keys = [] + csv_keys: list[str] = [] for page in paginator.paginate(Bucket=bucket, Prefix=full_prefix): if "Contents" not in page: @@ -222,10 +243,10 @@ def download_from_s3( break if not csv_keys: - logger.error(f"No CSV files found in s3://{bucket}/{full_prefix}") + logger.error("No CSV files found in s3://%s/%s", bucket, full_prefix) return False - logger.info(f"Downloading {len(csv_keys)} files to {self.paths['samples']}") + logger.info("Downloading %d files to %s", len(csv_keys), self.paths["samples"]) for i, key in enumerate(csv_keys, 1): filename = Path(key).name @@ -233,13 +254,13 @@ def download_from_s3( s3.download_file(bucket, key, str(local_path)) if i % 100 == 0 or i == len(csv_keys): - logger.info(f" Downloaded {i}/{len(csv_keys)} files") + logger.info(" Downloaded %d/%d files", i, len(csv_keys)) - logger.info(f"Download complete: {len(csv_keys)} files") + logger.info("Download complete: %d files", len(csv_keys)) return True - except Exception as e: - logger.error(f"S3 download failed: {e}") + except Exception as exc: + logger.error("S3 download failed: %s", exc) return False def process_raw_data(self, year_month: str) -> bool: @@ -250,17 +271,17 @@ def process_raw_data(self, year_month: str) -> bool: Uses lazy evaluation for memory efficiency. Args: - year_month: Month identifier for output file naming + year_month: Month identifier for output file naming. Returns: - True if processing successful, False otherwise + True if processing successful, False otherwise. """ csv_files = sorted(self.paths["samples"].glob("*.csv")) if not csv_files: - logger.error(f"No CSV files found in {self.paths['samples']}") + logger.error("No CSV files found in %s", self.paths["samples"]) return False - logger.info(f"Processing {len(csv_files)} CSV files") + logger.info("Processing %d CSV files", len(csv_files)) from smart_meter_analysis.aws_loader import ( COMED_SCHEMA, @@ -268,18 +289,18 @@ def process_raw_data(self, year_month: str) -> bool: transform_wide_to_long_lazy, ) - lazy_frames = [] + lazy_frames: list[pl.LazyFrame] = [] for i, csv_path in enumerate(csv_files, 1): if i % 200 == 0 or i == len(csv_files): - logger.info(f" Scanned {i}/{len(csv_files)} files") + logger.info(" Scanned %d/%d files", i, len(csv_files)) try: lf = pl.scan_csv(str(csv_path), schema_overrides=COMED_SCHEMA, ignore_errors=True) lf = transform_wide_to_long_lazy(lf) lf = add_time_columns_lazy(lf, day_mode="calendar") lazy_frames.append(lf) - except Exception as e: - logger.warning(f"Failed to scan {csv_path.name}: {e}") + except Exception as exc: + logger.warning("Failed to scan %s: %s", csv_path.name, exc) if not lazy_frames: logger.error("No files successfully scanned") @@ -292,7 +313,7 @@ def process_raw_data(self, year_month: str) -> bool: lf_combined.sink_parquet(self.paths["processed"]) row_count = pl.scan_parquet(self.paths["processed"]).select(pl.len()).collect()[0, 0] - logger.info(f"Wrote {row_count:,} records to {self.paths['processed']}") + logger.info("Wrote %s records to %s", f"{row_count:,}", self.paths["processed"]) return True @@ -301,19 +322,22 @@ def prepare_clustering_data( sample_days: int = DEFAULT_CLUSTERING_CONFIG["sample_days"], sample_households: int | None = DEFAULT_CLUSTERING_CONFIG.get("sample_households"), day_strategy: str = DEFAULT_CLUSTERING_CONFIG["day_strategy"], + chunk_size: int = DEFAULT_CLUSTERING_CONFIG["chunk_size"], ) -> bool: """ - Prepare daily load profiles for clustering at the household level. + Prepare daily household-day load profiles for clustering. - Creates profiles for individual households. + Creates 48-interval profiles for individual household-day combinations + using the manifest-based, chunked streaming pipeline. Args: - sample_days: Number of days to sample - sample_households: Number of households to sample (None = all) - day_strategy: 'stratified' (70/30 weekday/weekend) or 'random' + sample_days: Number of days to sample per run. + sample_households: Number of households to sample (None = all). + day_strategy: "stratified" (70/30 weekday/weekend) or "random". + chunk_size: Households per chunk for the streaming profile builder. Returns: - True if preparation successful, False otherwise + True if preparation successful, False otherwise. """ import subprocess @@ -321,7 +345,7 @@ def prepare_clustering_data( output_dir = self.paths["clustering_dir"] if not input_path.exists(): - logger.error(f"Processed data not found: {input_path}") + logger.error("Processed data not found: %s", input_path) return False cmd = [ @@ -335,18 +359,25 @@ def prepare_clustering_data( day_strategy, "--sample-days", str(sample_days), + "--streaming", # streaming is always enabled + "--chunk-size", + str(chunk_size), ] if sample_households: cmd.extend(["--sample-households", str(sample_households)]) logger.info( - f"Preparing household clustering data ({sample_households or 'all'} households X {sample_days} days)" + "Preparing household-day clustering data (%s households x %d days; streaming, chunk_size=%d)", + sample_households or "all", + sample_days, + chunk_size, ) + result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: - logger.error(f"Clustering prep failed: {result.stderr}") + logger.error("Clustering prep failed: %s", result.stderr) return False logger.info("Clustering data prepared") @@ -359,18 +390,18 @@ def run_clustering( n_init: int = DEFAULT_CLUSTERING_CONFIG["n_init"], ) -> bool: """ - Run k-means clustering on prepared profiles. + Run k-means clustering on prepared household-day profiles. Uses Euclidean distance since all profiles are aligned to the same time grid (no time warping needed). Args: - k_min: Minimum number of clusters to test - k_max: Maximum number of clusters to test - n_init: Number of k-means initializations + k_min: Minimum number of clusters to test. + k_max: Maximum number of clusters to test. + n_init: Number of k-means initializations. Returns: - True if clustering successful, False otherwise + True if clustering successful, False otherwise. """ import subprocess @@ -379,7 +410,7 @@ def run_clustering( results_dir.mkdir(parents=True, exist_ok=True) if not profiles_path.exists(): - logger.error(f"Profiles not found: {profiles_path}") + logger.error("Profiles not found: %s", profiles_path) return False cmd = [ @@ -398,13 +429,13 @@ def run_clustering( str(n_init), ] - logger.info(f"Running k-means clustering (k={k_min}-{k_max})...") + logger.info("Running k-means clustering (k=%d-%d)...", k_min, k_max) result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: - logger.error(f"Clustering failed: {result.stderr}") + logger.error("Clustering failed: %s", result.stderr) if result.stdout: - logger.error(f"stdout: {result.stdout}") + logger.error("stdout: %s", result.stdout) return False logger.info("Clustering complete") @@ -419,19 +450,20 @@ def run_stage2_regression( Run Stage 2: Block-group-level regression of cluster composition. Models how Census block-group demographics are associated with the - composition of households across load-profile clusters. + composition of household-day profiles across clusters. - Unit of analysis: ONE ROW PER (block_group, cluster). + Unit of analysis: ONE ROW PER (block_group, cluster), with counts over + household-day observations. stage2_blockgroup_regression.py handles census fetching internally - if cache doesn't exist. + if cache does not exist. Args: - crosswalk_path: Path to ZIP+4 → block-group crosswalk file - census_cache_path: Path to cached census data + crosswalk_path: Path to ZIP+4 → block-group crosswalk file. + census_cache_path: Path to cached census data. Returns: - True if regression successful, False otherwise + True if regression successful, False otherwise. """ import subprocess @@ -439,7 +471,7 @@ def run_stage2_regression( output_dir = self.paths["clustering_dir"] / "results" / "stage2_blockgroups" if not clusters_path.exists(): - logger.error(f"Cluster assignments not found: {clusters_path}") + logger.error("Cluster assignments not found: %s", clusters_path) logger.error("Run Stage 1 clustering first") return False @@ -450,10 +482,9 @@ def run_stage2_regression( census_cache_path = self.base_dir / "data" / "reference" / "census_17_2023.parquet" if not crosswalk_path.exists(): - logger.error(f"Crosswalk not found: {crosswalk_path}") + logger.error("Crosswalk not found: %s", crosswalk_path) return False - # stage2_blockgroup_regression.py handles census fetching if cache missing cmd = [ sys.executable, str(self.base_dir / "analysis" / "clustering" / "stage2_blockgroup_regression.py"), @@ -470,15 +501,15 @@ def run_stage2_regression( logger.info("Running Stage 2 block-group regression...") result = subprocess.run(cmd, capture_output=True, text=True) - # Print output (report goes to stdout) + # Print report (stage2 script writes a human-readable summary to stdout) if result.stdout: print(result.stdout) if result.returncode != 0: - logger.error(f"Stage 2 regression failed: {result.stderr}") + logger.error("Stage 2 regression failed: %s", result.stderr) return False - logger.info(f"Stage 2 complete: {output_dir}") + logger.info("Stage 2 complete: %s", output_dir) return True # ========================================================================= @@ -492,16 +523,16 @@ def validate_processed_data(self) -> dict: if not path.exists(): return self._fail("processed", f"File not found: {path}") - logger.info(f"Validating processed data: {path}") + logger.info("Validating processed data: %s", path) - errors = [] - warnings = [] + errors: list[str] = [] + warnings: list[str] = [] try: lf = pl.scan_parquet(path) schema = lf.collect_schema() - except Exception as e: - return self._fail("processed", f"Failed to read: {e}") + except Exception as exc: + return self._fail("processed", f"Failed to read: {exc}") # Check required columns required = ["zip_code", "account_identifier", "datetime", "kwh", "date", "hour"] @@ -511,13 +542,15 @@ def validate_processed_data(self) -> dict: # Get stats using lazy evaluation (no full load) try: - stats_df = lf.select([ - pl.len().alias("rows"), - pl.col("zip_code").n_unique().alias("zip_codes"), - pl.col("account_identifier").n_unique().alias("accounts"), - pl.col("kwh").min().alias("kwh_min"), - pl.col("kwh").null_count().alias("kwh_nulls"), - ]).collect() + stats_df = lf.select( + [ + pl.len().alias("rows"), + pl.col("zip_code").n_unique().alias("zip_codes"), + pl.col("account_identifier").n_unique().alias("accounts"), + pl.col("kwh").min().alias("kwh_min"), + pl.col("kwh").null_count().alias("kwh_nulls"), + ], + ).collect() stats_dict = stats_df.to_dicts()[0] @@ -543,27 +576,27 @@ def validate_processed_data(self) -> dict: "accounts": stats_dict["accounts"], "file_size_mb": path.stat().st_size / 1024 / 1024, } - except Exception as e: - return self._fail("processed", f"Failed to compute stats: {e}") + except Exception as exc: + return self._fail("processed", f"Failed to compute stats: {exc}") return self._result("processed", errors, warnings, stats) def validate_clustering_inputs(self) -> dict: - """Validate clustering input files.""" + """Validate clustering input files (household-day profiles).""" profiles_path = self.paths["clustering_dir"] / "sampled_profiles.parquet" if not profiles_path.exists(): return self._fail("clustering_inputs", f"Profiles not found: {profiles_path}") - logger.info(f"Validating clustering inputs: {profiles_path}") + logger.info("Validating clustering inputs: %s", profiles_path) try: df = pl.read_parquet(profiles_path) - except Exception as e: - return self._fail("clustering_inputs", f"Failed to read: {e}") + except Exception as exc: + return self._fail("clustering_inputs", f"Failed to read: {exc}") - errors = [] - warnings = [] + errors: list[str] = [] + warnings: list[str] = [] # Check required columns required = ["zip_code", "date", "profile"] @@ -595,15 +628,15 @@ def validate_clustering_outputs(self) -> dict: if not assignments_path.exists(): return self._skip("clustering_outputs", "No clustering results yet") - logger.info(f"Validating clustering outputs: {results_dir}") + logger.info("Validating clustering outputs: %s", results_dir) try: assignments = pl.read_parquet(assignments_path) - except Exception as e: - return self._fail("clustering_outputs", f"Failed to read: {e}") + except Exception as exc: + return self._fail("clustering_outputs", f"Failed to read: {exc}") - errors = [] - warnings = [] + errors: list[str] = [] + warnings: list[str] = [] # Check required columns if "cluster" not in assignments.columns: @@ -612,8 +645,6 @@ def validate_clustering_outputs(self) -> dict: # Check cluster distribution if "cluster" in assignments.columns: cluster_counts = assignments["cluster"].value_counts() - - # Check for empty clusters if cluster_counts["count"].min() == 0: warnings.append("Some clusters have no assignments") @@ -644,9 +675,11 @@ def run_full_pipeline( num_files: int, sample_days: int = DEFAULT_CLUSTERING_CONFIG["sample_days"], sample_households: int | None = DEFAULT_CLUSTERING_CONFIG["sample_households"], + day_strategy: str = DEFAULT_CLUSTERING_CONFIG["day_strategy"], k_min: int = DEFAULT_CLUSTERING_CONFIG["k_min"], k_max: int = DEFAULT_CLUSTERING_CONFIG["k_max"], n_init: int = DEFAULT_CLUSTERING_CONFIG["n_init"], + chunk_size: int = DEFAULT_CLUSTERING_CONFIG["chunk_size"], skip_clustering: bool = False, run_stage2: bool = False, ) -> bool: @@ -654,18 +687,20 @@ def run_full_pipeline( Execute the complete pipeline. Args: - year_month: Target month (YYYYMM format) - num_files: Number of S3 files to download - sample_days: Days to sample for clustering - sample_households: Households to sample (None = all) - k_min: Minimum clusters to test - k_max: Maximum clusters to test - n_init: Number of k-means initializations - skip_clustering: If True, stop after preparing data - run_stage2: If True, run demographic regression after clustering + year_month: Target month (YYYYMM format). + num_files: Number of S3 files to download. + sample_days: Days to sample for clustering. + sample_households: Households to sample (None = all). + day_strategy: Day sampling strategy ("stratified" or "random"). + k_min: Minimum clusters to test. + k_max: Maximum clusters to test. + n_init: Number of k-means initializations. + chunk_size: Households per chunk for streaming profile builder. + skip_clustering: If True, stop after preparing data. + run_stage2: If True, run demographic regression after clustering. Returns: - True if all steps succeed, False otherwise + True if all steps succeed, False otherwise. """ self._print_header("COMED PIPELINE EXECUTION") print(f"Year-Month: {year_month}") @@ -673,6 +708,7 @@ def run_full_pipeline( print(f"Output: {self.run_dir}") print(f"Clustering: {'Skipped' if skip_clustering else f'k={k_min}-{k_max}'}") print(f"Stage 2: {'Yes' if run_stage2 else 'No'}") + print(f"Profiles: streaming (chunk_size={chunk_size})") self.setup_directories() @@ -686,9 +722,14 @@ def run_full_pipeline( if not self.process_raw_data(year_month): return False - # Step 3: Prepare clustering + # Step 3: Prepare clustering data self._print_step("PREPARING CLUSTERING DATA") - if not self.prepare_clustering_data(sample_days, sample_households): + if not self.prepare_clustering_data( + sample_days=sample_days, + sample_households=sample_households, + day_strategy=day_strategy, + chunk_size=chunk_size, + ): return False # Step 4: Cluster (optional) @@ -715,7 +756,7 @@ def validate_all(self) -> bool: Run all validation checks. Returns: - True if all critical validations pass, False otherwise + True if all critical validations pass, False otherwise. """ self._print_header("VALIDATION") @@ -733,7 +774,7 @@ def validate_stage(self, stage: str) -> bool: self.results["clustering_inputs"] = self.validate_clustering_inputs() self.results["clustering_outputs"] = self.validate_clustering_outputs() else: - logger.error(f"Unknown stage: {stage}") + logger.error("Unknown stage: %s", stage) return False return self._print_summary() @@ -752,15 +793,15 @@ def _print_step(self, title: str) -> None: print(title) print(f"{'─' * 70}") - def _result(self, stage: str, errors: list, warnings: list, stats: dict) -> dict: + def _result(self, stage: str, errors: list[str], warnings: list[str], stats: dict) -> dict: status = "PASS" if not errors else "FAIL" icon = "✅" if status == "PASS" else "❌" print(f"\n{icon} {stage.upper()}: {status}") - for e in errors: - print(f" Error: {e}") - for w in warnings: - print(f" ⚠️ {w}") + for err in errors: + print(f" Error: {err}") + for warn in warnings: + print(f" ⚠️ {warn}") return {"status": status, "errors": errors, "warnings": warnings, "stats": stats} @@ -790,7 +831,7 @@ def _print_summary(self) -> bool: if stats.get("k"): print("\nClustering Results:") print(f" • {stats.get('n_assigned', '?')} profiles → {stats.get('k')} clusters") - if stats.get("silhouette"): + if stats.get("silhouette") is not None: print(f" • Silhouette score: {stats['silhouette']:.3f}") print() @@ -802,24 +843,35 @@ def _print_summary(self) -> bool: # ============================================================================= -def main(): +def main() -> None: parser = argparse.ArgumentParser( description="ComEd Smart Meter Analysis Pipeline", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: - # Run full pipeline with 1000 files - python run_comed_pipeline.py --from-s3 --num-files 1000 - - # Quick test with 100 files, skip clustering - python run_comed_pipeline.py --from-s3 --num-files 100 --skip-clustering - - # Validate existing results - python run_comed_pipeline.py --validate-only --run-name 202308_1000 - - # Custom clustering parameters - python run_comed_pipeline.py --from-s3 --num-files 1000 \\ - --sample-households 2000 --sample-days 15 --k-range 3 5 + # Quick local test (100 files, fewer households/days, no Stage 2) + python scripts/run_comed_pipeline.py \\ + --from-s3 \\ + --year-month 202308 \\ + --num-files 100 \\ + --sample-households 2000 \\ + --sample-days 10 \\ + --k-range 3 4 + + # High-volume analysis (tested configuration, with Stage 2) + python scripts/run_comed_pipeline.py \\ + --from-s3 \\ + --year-month 202308 \\ + --num-files 10000 \\ + --sample-households 20000 \\ + --sample-days 31 \\ + --k-range 3 6 \\ + --run-stage2 + + # Validate existing results for a specific run + python scripts/run_comed_pipeline.py \\ + --validate-only \\ + --run-name 202308_10000 """, ) @@ -866,7 +918,7 @@ def main(): "--sample-households", type=int, default=DEFAULT_CLUSTERING_CONFIG["sample_households"], - help=f"Households to sample (default: {DEFAULT_CLUSTERING_CONFIG['sample_households']}, use 0 for all)", + help=(f"Households to sample (default: {DEFAULT_CLUSTERING_CONFIG['sample_households']}, use 0 for all)"), ) cluster_group.add_argument( "--sample-days", @@ -880,16 +932,17 @@ def main(): nargs=2, metavar=("MIN", "MAX"), default=[DEFAULT_CLUSTERING_CONFIG["k_min"], DEFAULT_CLUSTERING_CONFIG["k_max"]], - help=f"Cluster range to test (default: {DEFAULT_CLUSTERING_CONFIG['k_min']} {DEFAULT_CLUSTERING_CONFIG['k_max']})", + help=( + "Cluster range to test " + f"(default: {DEFAULT_CLUSTERING_CONFIG['k_min']} {DEFAULT_CLUSTERING_CONFIG['k_max']})" + ), ) cluster_group.add_argument( "--day-strategy", choices=["stratified", "random"], default=DEFAULT_CLUSTERING_CONFIG["day_strategy"], - help="Day sampling strategy (default: stratified = 70%% weekday, 30%% weekend)", + help="Day sampling strategy (default: stratified = 70% weekday, 30% weekend)", ) - - # Clustering options cluster_group.add_argument( "--n-init", type=int, @@ -901,6 +954,14 @@ def main(): action="store_true", help="Fast mode: k=3-4 (for testing)", ) + cluster_group.add_argument( + "--chunk-size", + type=int, + default=DEFAULT_CLUSTERING_CONFIG["chunk_size"], + help=( + f"Households per chunk for streaming profile builder (default: {DEFAULT_CLUSTERING_CONFIG['chunk_size']})" + ), + ) # Output options output_group = parser.add_argument_group("Output Options") @@ -944,17 +1005,19 @@ def main(): num_files=args.num_files, sample_days=args.sample_days, sample_households=sample_households, + day_strategy=args.day_strategy, k_min=args.k_range[0], k_max=args.k_range[1], n_init=args.n_init, + chunk_size=args.chunk_size, skip_clustering=args.skip_clustering, run_stage2=args.run_stage2, ) + if success: pipeline.validate_all() elif args.validate_only: success = pipeline.validate_all() if args.stage == "all" else pipeline.validate_stage(args.stage) - else: parser.print_help() print("\n⚠️ Specify --from-s3 to run pipeline or --validate-only to check existing files") diff --git a/smart_meter_analysis/manifests.py b/smart_meter_analysis/manifests.py index 30c8d7b..bf8bcf6 100644 --- a/smart_meter_analysis/manifests.py +++ b/smart_meter_analysis/manifests.py @@ -72,22 +72,11 @@ def _validate_input_has_columns(input_path: Path, required: list[str]) -> None: def ensure_account_manifest(input_path: Path) -> Path: """ - Create or load account manifest using streaming sink. + Create or load account manifest in a memory-safe way. - The manifest contains unique (account_identifier, zip_code) pairs, - extracted from the source file without loading it fully into memory. - - Args: - input_path: - Path to interval-level parquet file. - - Returns: - Path to account manifest parquet file. - - Example: - >>> manifest_path = ensure_account_manifest(Path("comed_202308.parquet")) - >>> accounts_df = pl.read_parquet(manifest_path) - >>> accounts = accounts_df["account_identifier"].to_list() + The manifest contains one row per account_identifier with an associated + zip_code. Memory cost scales with the number of accounts, not the number + of interval-level rows. """ _validate_input_has_columns(input_path, ["account_identifier", "zip_code"]) @@ -110,16 +99,30 @@ def ensure_account_manifest(input_path: Path) -> Path: manifest_path, ) - # Build manifest using streaming - logger.info("Building account manifest from %s (streaming)...", input_path) + # Build manifest using streaming-friendly group_by + logger.info("Building account manifest from %s (streaming group_by)...", input_path) log_memory("before account manifest") - (pl.scan_parquet(input_path).select(["account_identifier", "zip_code"]).unique().sink_parquet(manifest_path)) + lf = pl.scan_parquet(input_path) + + # One row per account_identifier, keep a representative zip_code. + # group_by + agg with collect(streaming=True) is streaming-safe; we only + # hold ~#accounts rows in memory. + accounts_df = ( + lf.select(["account_identifier", "zip_code"]) + .filter(pl.col("account_identifier").is_not_null()) + .group_by("account_identifier") + .agg(pl.first("zip_code").alias("zip_code")) + .collect(streaming=True) + .sort("account_identifier") + ) + + manifest_path.parent.mkdir(parents=True, exist_ok=True) + accounts_df.write_parquet(manifest_path) log_memory("after account manifest") - # Validate and report - n = pl.scan_parquet(manifest_path).select(pl.len()).collect()[0, 0] + n = accounts_df.height logger.info( "Account manifest complete (rebuilt): %s (%s accounts)", manifest_path, @@ -131,18 +134,11 @@ def ensure_account_manifest(input_path: Path) -> Path: def ensure_date_manifest(input_path: Path) -> Path: """ - Create or load date manifest using streaming sink. - - The manifest contains unique (date, is_weekend, weekday) tuples. - This is typically small (~31 rows for a month) but we use streaming - for consistency and to avoid scanning the full file into memory. + Create or load date manifest in a memory-safe way. - Args: - input_path: - Path to interval-level parquet file. - - Returns: - Path to date manifest parquet file. + The manifest contains one row per date with representative weekend/weekday + flags. This is small in practice (~31 rows for one month), but we still + build it via streaming group_by for consistency. """ _validate_input_has_columns(input_path, ["date", "is_weekend", "weekday"]) @@ -165,16 +161,30 @@ def ensure_date_manifest(input_path: Path) -> Path: manifest_path, ) - # Build manifest using streaming - logger.info("Building date manifest from %s (streaming)...", input_path) + # Build manifest using streaming-friendly group_by + logger.info("Building date manifest from %s (streaming group_by)...", input_path) log_memory("before date manifest") - (pl.scan_parquet(input_path).select(["date", "is_weekend", "weekday"]).unique().sink_parquet(manifest_path)) + lf = pl.scan_parquet(input_path) + + dates_df = ( + lf.select(["date", "is_weekend", "weekday"]) + .filter(pl.col("date").is_not_null()) + .group_by("date") + .agg( + pl.first("is_weekend").alias("is_weekend"), + pl.first("weekday").alias("weekday"), + ) + .collect(streaming=True) + .sort("date") + ) + + manifest_path.parent.mkdir(parents=True, exist_ok=True) + dates_df.write_parquet(manifest_path) log_memory("after date manifest") - # Validate and report - n = pl.scan_parquet(manifest_path).select(pl.len()).collect()[0, 0] + n = dates_df.height logger.info( "Date manifest complete (rebuilt): %s (%s dates)", manifest_path, From 9294e283b9cdf80c2069b5a2db68fe044620a940 Mon Sep 17 00:00:00 2001 From: Griffin Sharps Date: Wed, 10 Dec 2025 20:12:53 +0000 Subject: [PATCH 4/4] Use engine='streaming' in manifest collection for Polars LazyFrame --- smart_meter_analysis/manifests.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/smart_meter_analysis/manifests.py b/smart_meter_analysis/manifests.py index bf8bcf6..fd6da8d 100644 --- a/smart_meter_analysis/manifests.py +++ b/smart_meter_analysis/manifests.py @@ -113,7 +113,7 @@ def ensure_account_manifest(input_path: Path) -> Path: .filter(pl.col("account_identifier").is_not_null()) .group_by("account_identifier") .agg(pl.first("zip_code").alias("zip_code")) - .collect(streaming=True) + .collect(engine="streaming") .sort("account_identifier") ) @@ -175,7 +175,7 @@ def ensure_date_manifest(input_path: Path) -> Path: pl.first("is_weekend").alias("is_weekend"), pl.first("weekday").alias("weekday"), ) - .collect(streaming=True) + .collect(engine="streaming") .sort("date") )