diff --git a/analysis/clustering/euclidean_clustering.py b/analysis/clustering/euclidean_clustering.py
index e591797..983c14e 100644
--- a/analysis/clustering/euclidean_clustering.py
+++ b/analysis/clustering/euclidean_clustering.py
@@ -11,18 +11,19 @@
Pipeline:
1. Load daily profiles from Phase 1
2. Normalize profiles (optional but recommended)
- 3. Evaluate k values to find optimal k (via silhouette score)
+ 3. Evaluate k values to find optimal k (via silhouette score on a sample)
4. Run final clustering with optimal k
5. Output assignments, centroids, and visualizations
Usage:
- # Standard run (evaluates k=3-6)
+ # Standard run (evaluates k=3-6 using silhouette on up to 10k samples)
python euclidean_clustering.py \\
--input data/clustering/sampled_profiles.parquet \\
--output-dir data/clustering/results \\
--k-range 3 6 \\
--find-optimal-k \\
- --normalize
+ --normalize \\
+ --silhouette-sample-size 10000
# Fixed k (no evaluation)
python euclidean_clustering.py \\
@@ -68,9 +69,13 @@ def load_profiles(path: Path) -> tuple[np.ndarray, pl.DataFrame]:
# Extract profiles as numpy array
profiles = np.array(df["profile"].to_list(), dtype=np.float64)
- logger.info(f" Loaded {len(profiles):,} profiles with {profiles.shape[1]} time points each")
- logger.info(f" Data shape: {profiles.shape}")
- logger.info(f" Data range: [{profiles.min():.2f}, {profiles.max():.2f}]")
+ logger.info(
+ " Loaded %s profiles with %s time points each",
+ f"{len(profiles):,}",
+ profiles.shape[1],
+ )
+ logger.info(" Data shape: %s", (profiles.shape[0], profiles.shape[1]))
+ logger.info(" Data range: [%.2f, %.2f]", profiles.min(), profiles.max())
return profiles, df
@@ -92,7 +97,7 @@ def normalize_profiles(
if method == "none":
return X
- logger.info(f"Normalizing profiles using {method} method...")
+ logger.info("Normalizing profiles using %s method...", method)
if method == "zscore":
# Per-profile z-score normalization
@@ -110,7 +115,11 @@ def normalize_profiles(
else:
raise ValueError(f"Unknown normalization method: {method}")
- logger.info(f" Normalized data range: [{X_norm.min():.2f}, {X_norm.max():.2f}]")
+ logger.info(
+ " Normalized data range: [%.2f, %.2f]",
+ X_norm.min(),
+ X_norm.max(),
+ )
return X_norm
@@ -120,21 +129,41 @@ def evaluate_clustering(
k_range: range,
n_init: int = 10,
random_state: int = 42,
+ silhouette_sample_size: int | None = 10_000,
) -> dict:
"""
Evaluate clustering for different values of k.
+ Uses inertia on the full dataset and silhouette score computed on a
+ subsample (to avoid O(n^2) cost when n is large).
+
Args:
X: Profile array of shape (n_samples, n_timepoints)
k_range: Range of k values to test
n_init: Number of random initializations
random_state: Random seed for reproducibility
+ silhouette_sample_size: Max number of samples for silhouette.
+ If None, uses full dataset (NOT recommended for very large n).
Returns:
Dictionary with k_values, inertia, and silhouette scores
"""
- logger.info(f"Evaluating clustering for k in {list(k_range)}...")
- logger.info(f" Dataset size: {X.shape[0]:,} profiles")
+ n_samples = X.shape[0]
+ logger.info("Evaluating clustering for k in %s...", list(k_range))
+ logger.info(" Dataset size: %s profiles", f"{n_samples:,}")
+
+ if silhouette_sample_size is None:
+ logger.info(" Silhouette: using FULL dataset (may be very slow).")
+ elif n_samples > silhouette_sample_size:
+ logger.info(
+ " Silhouette: using a random subsample of %s profiles.",
+ f"{silhouette_sample_size:,}",
+ )
+ else:
+ logger.info(
+ " Silhouette: using all %s profiles (<= sample size).",
+ f"{n_samples:,}",
+ )
results = {
"k_values": [],
@@ -143,7 +172,8 @@ def evaluate_clustering(
}
for k in k_range:
- logger.info(f"\n Testing k={k}...")
+ logger.info("")
+ logger.info(" Testing k=%d...", k)
model = KMeans(
n_clusters=k,
@@ -153,14 +183,23 @@ def evaluate_clustering(
labels = model.fit_predict(X)
- sil_score = silhouette_score(X, labels, metric="euclidean")
+ # Inertia on full data
+ inertia = float(model.inertia_)
+
+ # Silhouette on sample (or full data if silhouette_sample_size is None)
+ sil_kwargs: dict = {"metric": "euclidean"}
+ if silhouette_sample_size is not None and n_samples > silhouette_sample_size:
+ sil_kwargs["sample_size"] = silhouette_sample_size
+ sil_kwargs["random_state"] = random_state
+
+ sil_score = silhouette_score(X, labels, **sil_kwargs)
results["k_values"].append(k)
- results["inertia"].append(float(model.inertia_))
+ results["inertia"].append(inertia)
results["silhouette"].append(float(sil_score))
- logger.info(f" Inertia: {model.inertia_:,.2f}")
- logger.info(f" Silhouette: {sil_score:.3f}")
+ logger.info(" Inertia: %s", f"{inertia:,.2f}")
+ logger.info(" Silhouette: %.3f", sil_score)
return results
@@ -178,10 +217,15 @@ def find_optimal_k(eval_results: dict) -> int:
k_values = eval_results["k_values"]
silhouettes = eval_results["silhouette"]
- best_idx = np.argmax(silhouettes)
- best_k = k_values[best_idx]
+ best_idx = int(np.argmax(silhouettes))
+ best_k = int(k_values[best_idx])
- logger.info(f"\nOptimal k={best_k} (silhouette={silhouettes[best_idx]:.3f})")
+ logger.info("")
+ logger.info(
+ "Optimal k=%d (silhouette=%.3f)",
+ best_k,
+ silhouettes[best_idx],
+ )
return best_k
@@ -204,7 +248,12 @@ def run_clustering(
Returns:
Tuple of (labels, centroids, inertia)
"""
- logger.info(f"\nRunning k-means with k={k} on {X.shape[0]:,} profiles...")
+ logger.info("")
+ logger.info(
+ "Running k-means with k=%d on %s profiles...",
+ k,
+ f"{X.shape[0]:,}",
+ )
model = KMeans(
n_clusters=k,
@@ -214,16 +263,22 @@ def run_clustering(
labels = model.fit_predict(X)
centroids = model.cluster_centers_
+ inertia = float(model.inertia_)
- logger.info(f" Inertia: {model.inertia_:,.2f}")
+ logger.info(" Inertia: %s", f"{inertia:,.2f}")
# Log cluster distribution
unique, counts = np.unique(labels, return_counts=True)
for cluster, count in zip(unique, counts):
pct = count / len(labels) * 100
- logger.info(f" Cluster {cluster}: {count:,} profiles ({pct:.1f}%)")
+ logger.info(
+ " Cluster %d: %s profiles (%.1f%%)",
+ cluster,
+ f"{count:,}",
+ pct,
+ )
- return labels, centroids, float(model.inertia_)
+ return labels, centroids, inertia
def plot_centroids(
@@ -272,7 +327,7 @@ def plot_centroids(
plt.savefig(output_path, dpi=150)
plt.close()
- logger.info(f" Saved centroids plot: {output_path}")
+ logger.info(" Saved centroids plot: %s", output_path)
def plot_cluster_samples(
@@ -316,8 +371,12 @@ def plot_cluster_samples(
cluster_mask = labels == i
cluster_profiles = X[cluster_mask]
- # Sample profiles
n_available = len(cluster_profiles)
+ if n_available == 0:
+ ax.set_title(f"Cluster {i} (n=0)")
+ ax.grid(True, alpha=0.3)
+ continue
+
n_plot = min(n_samples, n_available)
idx = rng.choice(n_available, size=n_plot, replace=False)
@@ -342,7 +401,7 @@ def plot_cluster_samples(
plt.savefig(output_path, dpi=150)
plt.close()
- logger.info(f" Saved cluster samples plot: {output_path}")
+ logger.info(" Saved cluster samples plot: %s", output_path)
def plot_elbow_curve(
@@ -379,7 +438,7 @@ def plot_elbow_curve(
ax2.set_xticks(k_values)
# Mark optimal k
- best_idx = np.argmax(silhouette)
+ best_idx = int(np.argmax(silhouette))
ax2.axvline(x=k_values[best_idx], color="red", linestyle="--", alpha=0.7)
ax2.scatter(
[k_values[best_idx]],
@@ -395,7 +454,7 @@ def plot_elbow_curve(
plt.savefig(output_path, dpi=150)
plt.close()
- logger.info(f" Saved elbow curve: {output_path}")
+ logger.info(" Saved elbow curve: %s", output_path)
def save_results(
@@ -420,7 +479,7 @@ def save_results(
output_dir.mkdir(parents=True, exist_ok=True)
# Determine which ID columns are present (household vs ZIP+4 level)
- id_cols = []
+ id_cols: list[str] = []
if "account_identifier" in df.columns:
id_cols.append("account_identifier")
if "zip_code" in df.columns:
@@ -431,10 +490,12 @@ def save_results(
available_cols = [c for c in id_cols if c in df.columns]
# Save cluster assignments
- assignments = df.select(available_cols).with_columns(pl.Series("cluster", labels))
+ assignments = df.select(available_cols).with_columns(
+ pl.Series("cluster", labels),
+ )
assignments_path = output_dir / "cluster_assignments.parquet"
assignments.write_parquet(assignments_path)
- logger.info(f" Saved assignments: {assignments_path}")
+ logger.info(" Saved assignments: %s", assignments_path)
# Save centroids as parquet
centroids_df = pl.DataFrame({
@@ -443,20 +504,20 @@ def save_results(
})
centroids_path = output_dir / "cluster_centroids.parquet"
centroids_df.write_parquet(centroids_path)
- logger.info(f" Saved centroids: {centroids_path}")
+ logger.info(" Saved centroids: %s", centroids_path)
# Save k evaluation results
if eval_results:
eval_path = output_dir / "k_evaluation.json"
with open(eval_path, "w") as f:
json.dump(eval_results, f, indent=2)
- logger.info(f" Saved k evaluation: {eval_path}")
+ logger.info(" Saved k evaluation: %s", eval_path)
# Save metadata
metadata_path = output_dir / "clustering_metadata.json"
with open(metadata_path, "w") as f:
json.dump(metadata, f, indent=2)
- logger.info(f" Saved metadata: {metadata_path}")
+ logger.info(" Saved metadata: %s", metadata_path)
def main() -> None:
@@ -465,11 +526,12 @@ def main() -> None:
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
- # Standard run with k evaluation
+ # Standard run with k evaluation (silhouette on sample)
python euclidean_clustering.py \\
--input data/clustering/sampled_profiles.parquet \\
--output-dir data/clustering/results \\
- --k-range 3 6 --find-optimal-k --normalize
+ --k-range 3 6 --find-optimal-k --normalize \\
+ --silhouette-sample-size 10000
# Fixed k (no evaluation)
python euclidean_clustering.py \\
@@ -513,6 +575,15 @@ def main() -> None:
action="store_true",
help="Evaluate k range and use optimal k",
)
+ k_group.add_argument(
+ "--silhouette-sample-size",
+ type=int,
+ default=10_000,
+ help=(
+ "Max number of samples for silhouette evaluation "
+ "(default: 10000; use -1 to use full dataset, not recommended for large n)."
+ ),
+ )
# Clustering parameters
parser.add_argument(
@@ -526,7 +597,7 @@ def main() -> None:
parser.add_argument(
"--normalize",
action="store_true",
- help="Apply z-score normalization to profiles",
+ help="Apply normalization to profiles",
)
parser.add_argument(
"--normalize-method",
@@ -561,16 +632,21 @@ def main() -> None:
if args.k is not None:
# Fixed k
k = args.k
- logger.info(f"\nUsing fixed k={k}")
+ logger.info("")
+ logger.info("Using fixed k=%d", k)
elif args.find_optimal_k:
# Evaluate k range
k_range = range(args.k_range[0], args.k_range[1] + 1)
+ silhouette_sample_size: int | None
+ silhouette_sample_size = None if args.silhouette_sample_size < 0 else args.silhouette_sample_size
+
eval_results = evaluate_clustering(
X,
k_range=k_range,
n_init=args.n_init,
random_state=args.random_state,
+ silhouette_sample_size=silhouette_sample_size,
)
# Save elbow curve
@@ -581,7 +657,8 @@ def main() -> None:
else:
# Default to min of k_range
k = args.k_range[0]
- logger.info(f"\nUsing default k={k}")
+ logger.info("")
+ logger.info("Using default k=%d", k)
# Run final clustering
labels, centroids, inertia = run_clustering(
@@ -592,25 +669,28 @@ def main() -> None:
)
# Create visualizations
- logger.info("\nGenerating visualizations...")
+ logger.info("")
+ logger.info("Generating visualizations...")
args.output_dir.mkdir(parents=True, exist_ok=True)
plot_centroids(centroids, args.output_dir / "cluster_centroids.png")
plot_cluster_samples(X, labels, centroids, args.output_dir / "cluster_samples.png")
# Save results
- logger.info("\nSaving results...")
+ logger.info("")
+ logger.info("Saving results...")
metadata = {
- "k": k,
- "n_profiles": len(X),
- "n_timepoints": X.shape[1],
- "normalized": args.normalize,
+ "k": int(k),
+ "n_profiles": int(X.shape[0]),
+ "n_timepoints": int(X.shape[1]),
+ "normalized": bool(args.normalize),
"normalize_method": args.normalize_method if args.normalize else None,
- "n_init": args.n_init,
- "random_state": args.random_state,
- "inertia": inertia,
+ "n_init": int(args.n_init),
+ "random_state": int(args.random_state),
+ "inertia": float(inertia),
"distance_metric": "euclidean",
+ "silhouette_sample_size": (None if args.silhouette_sample_size < 0 else int(args.silhouette_sample_size)),
}
save_results(df, labels, centroids, eval_results, metadata, args.output_dir)
@@ -620,7 +700,7 @@ def main() -> None:
print("CLUSTERING COMPLETE")
print("=" * 70)
print(f"\nResults saved to: {args.output_dir}")
- print(f" • {len(X):,} profiles clustered into {k} groups")
+ print(f" • {X.shape[0]:,} profiles clustered into {k} groups")
print(f" • Inertia: {inertia:,.2f}")
if eval_results:
best_sil = max(eval_results["silhouette"])
diff --git a/analysis/clustering/prepare_clustering_data_households.py b/analysis/clustering/prepare_clustering_data_households.py
index 870a51b..660ff31 100644
--- a/analysis/clustering/prepare_clustering_data_households.py
+++ b/analysis/clustering/prepare_clustering_data_households.py
@@ -6,19 +6,22 @@
Transforms interval-level energy data into daily load profiles at the
HOUSEHOLD (account) level for k-means clustering.
-Pipeline:
- 1. Validate schema (no data scan)
- 2. Scan for metadata and sample households + dates
- 3. Create daily 48-point load profiles per household
- 4. Output profiles ready for clustering
+What this does:
+ 1. Validate schema (no full data scan)
+ 2. Build/load manifests for accounts and dates (streaming, memory-safe)
+ 3. Sample households + dates from manifests
+ 4. Create daily 48-point load profiles per household
+ 5. Output profiles ready for clustering
Design notes:
- - Minimizes full-file scans (summary + unique accounts + unique dates only)
+ - Uses MANIFESTS to avoid OOM on unique().collect() for large files
+ - Manifests are built once via sink_parquet (streaming) and cached
- Standard mode: single filtered collect() for selected households/dates
- * good for up to ~50k households sampled
- - Streaming mode: sink_parquet() pre-filter, then aggregate
- * safer for 100k+ households
+ * good for up to ~5k-10k households sampled (depending on memory)
+ - Chunked streaming mode: fully streaming pipeline with per-chunk sinks
+ * required for 10k+ households on constrained hardware
- Profiles are 48 half-hourly kWh values in chronological order (00:30-24:00)
+ - Incomplete days (not 48 intervals) are dropped as “missing/irregular data”
Output files:
- sampled_profiles.parquet:
@@ -26,31 +29,43 @@
- household_zip4_map.parquet:
Unique account_identifier → zip_code mapping for later joins
+Manifest files (auto-generated alongside input, via smart_meter_analysis.manifests):
+ - {input_stem}_accounts.parquet: unique (account_identifier, zip_code) pairs
+ - {input_stem}_dates.parquet: unique (date, is_weekend, weekday) tuples
+
Usage:
- # Standard (5000 households, 20 days)
+ # Standard (5,000 households, 20 days)
python prepare_clustering_data_households.py \
--input data/processed/comed_202308.parquet \
--output-dir data/clustering \
--sample-households 5000 \
--sample-days 20
- # Large dataset with streaming
+ # Large dataset with chunked streaming (20,000 households)
python prepare_clustering_data_households.py \
--input data/processed/comed_202308.parquet \
--output-dir data/clustering \
- --sample-households 100000 \
- --streaming
+ --sample-households 20000 \
+ --sample-days 31 \
+ --streaming \
+ --chunk-size 2000
"""
from __future__ import annotations
import argparse
+import gc
import logging
from pathlib import Path
from typing import Any, Literal
import polars as pl
+from smart_meter_analysis.manifests import (
+ ensure_account_manifest,
+ ensure_date_manifest,
+)
+
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
@@ -62,6 +77,25 @@
REQUIRED_TIME_COLS = ["date", "hour", "is_weekend", "weekday"]
+# =============================================================================
+# MEMORY INSTRUMENTATION
+# =============================================================================
+
+
+def log_memory(label: str) -> None:
+ """Log current RSS memory usage (Linux only)."""
+ try:
+ with open("/proc/self/status") as f:
+ for line in f:
+ if line.startswith("VmRSS:"):
+ mem_mb = int(line.split()[1]) / 1024
+ logger.info("[MEMORY] %s: %.0f MB", label, mem_mb)
+ break
+ except Exception as exc:
+ # Best-effort only; ignore on non-Linux or restricted environments
+ logger.debug("Skipping memory log for %s: %s", label, exc)
+
+
# =============================================================================
# DATA INSPECTION & SAMPLING
# =============================================================================
@@ -108,13 +142,12 @@ def get_metadata_and_samples(
seed: int = 42,
) -> dict[str, Any]:
"""
- Get summary statistics and sample households + dates using minimal scans.
+ Get summary statistics and sample households + dates using MANIFESTS.
- This function performs:
- - Summary stats (row counts, unique counts, date range)
- - Unique households
- - Unique dates with weekend flags
- - Sampling of households and dates
+ This function:
+ - Computes summary stats via streaming (safe for large files)
+ - Builds/loads account and date manifests (streaming sink, memory-safe)
+ - Samples households and dates from the small manifest files
Args:
input_path: Path to input parquet file.
@@ -130,44 +163,55 @@ def get_metadata_and_samples(
- dates: list[date] of sampled dates
"""
logger.info("Scanning input data for metadata and sampling...")
+ log_memory("start of get_metadata_and_samples")
lf = pl.scan_parquet(input_path)
- # Summary stats
+ # Summary stats (streaming-safe: single row output)
+ logger.info(" Computing summary statistics...")
summary_df = lf.select([
pl.len().alias("n_rows"),
pl.col("zip_code").n_unique().alias("n_zip_codes"),
pl.col("account_identifier").n_unique().alias("n_accounts"),
pl.col("date").min().alias("min_date"),
pl.col("date").max().alias("max_date"),
- ]).collect()
+ ]).collect(engine="streaming")
summary = summary_df.to_dicts()[0]
# Early checks
if summary["n_rows"] == 0:
raise ValueError(f"Input file {input_path} contains no rows.")
- logger.info(f" {summary['n_rows']:,} rows")
- logger.info(f" {summary['n_accounts']:,} households")
- logger.info(f" {summary['n_zip_codes']:,} ZIP+4 codes")
- logger.info(f" Date range: {summary['min_date']} to {summary['max_date']}")
+ logger.info(" %s rows", f"{summary['n_rows']:,}")
+ logger.info(" %s households", f"{summary['n_accounts']:,}")
+ logger.info(" %s ZIP+4 codes", f"{summary['n_zip_codes']:,}")
+ logger.info(" Date range: %s to %s", summary["min_date"], summary["max_date"])
- # Unique households
- accounts_df = pl.scan_parquet(input_path).select("account_identifier").unique().collect()
- if accounts_df.height == 0:
- raise ValueError("No account_identifier values found in input data.")
+ # ==========================================================================
+ # KEY CHANGE: Use manifests instead of unique().collect()
+ # ==========================================================================
+
+ logger.info(" Loading manifests...")
+ account_manifest = ensure_account_manifest(input_path)
+ date_manifest = ensure_date_manifest(input_path)
+
+ # Read from small manifest files (fits easily in memory)
+ accounts_df = pl.read_parquet(account_manifest)
+ dates_df = pl.read_parquet(date_manifest)
+
+ log_memory("after loading manifests")
- # Unique dates with weekend flag
- dates_df = pl.scan_parquet(input_path).select(["date", "is_weekend"]).unique().collect()
+ if accounts_df.height == 0:
+ raise ValueError("No account_identifier values found in manifest.")
if dates_df.height == 0:
- raise ValueError("No dates found in input data.")
+ raise ValueError("No dates found in manifest.")
# Sample households
if sample_households is not None and sample_households < len(accounts_df):
accounts_df = accounts_df.sample(n=sample_households, shuffle=True, seed=seed)
- logger.info(f" Sampled {len(accounts_df):,} households")
+ logger.info(" Sampled %s households", f"{len(accounts_df):,}")
else:
- logger.info(f" Using all {len(accounts_df):,} households")
+ logger.info(" Using all %s households", f"{len(accounts_df):,}")
accounts = accounts_df["account_identifier"].to_list()
@@ -176,11 +220,8 @@ def get_metadata_and_samples(
weekday_df = dates_df.filter(~pl.col("is_weekend"))
weekend_df = dates_df.filter(pl.col("is_weekend"))
- if weekday_df.height == 0:
- logger.warning(" No weekdays found; falling back to random day sampling.")
- day_strategy = "random"
- elif weekend_df.height == 0:
- logger.warning(" No weekends found; falling back to random day sampling.")
+ if weekday_df.height == 0 or weekend_df.height == 0:
+ logger.warning(" Missing weekdays or weekends; falling back to random day sampling.")
day_strategy = "random"
if day_strategy == "stratified":
@@ -198,15 +239,21 @@ def get_metadata_and_samples(
)
dates = sampled_weekdays + sampled_weekends
- logger.info(f" Sampled {len(sampled_weekdays)} weekdays + {len(sampled_weekends)} weekend days (stratified)")
+ logger.info(
+ " Sampled %d weekdays + %d weekend days (stratified)",
+ len(sampled_weekdays),
+ len(sampled_weekends),
+ )
else:
n_sample = min(sample_days, len(dates_df))
dates = dates_df.sample(n=n_sample, shuffle=True, seed=seed)["date"].to_list()
- logger.info(f" Sampled {len(dates)} days (random)")
+ logger.info(" Sampled %d days (random)", len(dates))
if not dates:
raise ValueError("No dates were sampled; check input data and sampling settings.")
+ log_memory("end of get_metadata_and_samples")
+
return {
"summary": summary,
"accounts": accounts,
@@ -232,7 +279,7 @@ def create_household_profiles(
(00:30 to 24:00).
This uses a single filtered collect() over the full file, which is efficient
- when sampling a subset of households and dates.
+ when sampling a moderate subset of households and dates.
Args:
input_path: Path to input parquet file.
@@ -248,7 +295,12 @@ def create_household_profiles(
- is_weekend
- weekday
"""
- logger.info(f"Creating profiles for {len(accounts):,} households X {len(dates)} days...")
+ logger.info(
+ "Creating profiles for %s households x %d days...",
+ f"{len(accounts):,}",
+ len(dates),
+ )
+ log_memory("start of create_household_profiles")
if not accounts:
raise ValueError("No accounts provided for profile creation.")
@@ -267,7 +319,8 @@ def create_household_profiles(
logger.warning(" No data found for selected households/dates")
return pl.DataFrame()
- logger.info(f" Loaded {len(df):,} interval records")
+ logger.info(" Loaded %s interval records", f"{len(df):,}")
+ log_memory("after loading filtered intervals")
profiles_df = df.group_by(["account_identifier", "zip_code", "date"]).agg([
pl.col("kwh").alias("profile"),
@@ -281,92 +334,163 @@ def create_household_profiles(
n_dropped = n_before - len(profiles_df)
if n_dropped > 0:
- logger.info(f" Dropped {n_dropped:,} incomplete profiles (DST days, missing data)")
+ logger.info(
+ " Dropped %s incomplete profiles (missing or irregular data)",
+ f"{n_dropped:,}",
+ )
- logger.info(f" Created {len(profiles_df):,} complete profiles")
+ logger.info(" Created %s complete profiles", f"{len(profiles_df):,}")
+ log_memory("end of create_household_profiles")
return profiles_df.drop("num_intervals")
-def create_household_profiles_streaming(
+def _create_profiles_for_chunk_streaming(
+ input_path: Path,
+ accounts_chunk: list[str],
+ dates: list[Any],
+ chunk_idx: int,
+ total_chunks: int,
+ tmp_dir: Path,
+) -> Path:
+ """
+ Create profiles for a single chunk of households and write directly to parquet.
+
+ Uses within-group sort (struct.sort_by) to preserve chronological order
+ without a global sort, which keeps the lazy plan streaming-compatible.
+
+ Returns:
+ Path to the chunk parquet file.
+ """
+ logger.info(
+ " Chunk %d/%d: %s households...",
+ chunk_idx + 1,
+ total_chunks,
+ f"{len(accounts_chunk):,}",
+ )
+ log_memory(f"chunk {chunk_idx + 1} start")
+
+ chunk_path = tmp_dir / f"sampled_profiles_chunk_{chunk_idx:03d}.parquet"
+
+ (
+ pl.scan_parquet(input_path)
+ .filter(pl.col("account_identifier").is_in(accounts_chunk) & pl.col("date").is_in(dates))
+ .group_by(["account_identifier", "zip_code", "date"])
+ .agg([
+ # Sort by datetime within group, then extract kwh values
+ pl.struct(["datetime", "kwh"]).sort_by("datetime").struct.field("kwh").alias("profile"),
+ pl.col("is_weekend").first(),
+ pl.col("weekday").first(),
+ pl.len().alias("num_intervals"),
+ ])
+ .filter(pl.col("num_intervals") == 48)
+ .drop("num_intervals")
+ .sink_parquet(chunk_path)
+ )
+
+ log_memory(f"chunk {chunk_idx + 1} done")
+ logger.info(" Wrote chunk parquet: %s", chunk_path)
+
+ return chunk_path
+
+
+def create_household_profiles_chunked_streaming(
input_path: Path,
accounts: list[str],
dates: list[Any],
output_path: Path,
+ chunk_size: int = 5000,
) -> int:
"""
- Create daily load profiles using a streaming-friendly two-pass approach.
+ Create daily load profiles using chunked streaming.
- Pass 1:
- Filter to selected households/dates and stream to a temp parquet file.
- Pass 2:
- Sort by (account_identifier, datetime), aggregate to daily profiles
- with list-of-48 kWh values, and write final output.
+ - Splits households into chunks.
+ - For each chunk, runs a streaming filter → group_by → aggregate → sink.
+ - Concatenates all chunk files using a streaming concat.
+ - Deletes temporary chunk files.
- This avoids loading the entire original parquet into memory, but the
- aggregated profiles are still collected in-memory before final write.
+ This avoids ever materializing profiles for all households in memory.
Args:
- input_path: Path to input parquet file.
+ input_path: Path to interval-level parquet file.
accounts: List of account_identifier values to include.
dates: List of dates to include.
- output_path: Path to write final profiles parquet.
+ output_path: Final output parquet path for all profiles.
+ chunk_size: Number of households per chunk.
Returns:
Number of complete daily profiles created.
"""
- logger.info(f"Creating profiles (streaming) for {len(accounts):,} households X {len(dates)} days...")
-
if not accounts:
- raise ValueError("No accounts provided for streaming profile creation.")
+ raise ValueError(
+ "No accounts provided for chunked streaming profile creation.",
+ )
if not dates:
- raise ValueError("No dates provided for streaming profile creation.")
+ raise ValueError("No dates provided for chunked streaming profile creation.")
- temp_path = output_path.parent / "_temp_filtered.parquet"
+ n_accounts = len(accounts)
+ n_chunks = (n_accounts + chunk_size - 1) // chunk_size
- try:
- # Pass 1: Stream filtered data to temp file
- logger.info(" Pass 1: Streaming filtered data to temp parquet...")
- (
- pl.scan_parquet(input_path)
- .filter(pl.col("account_identifier").is_in(accounts) & pl.col("date").is_in(dates))
- .sink_parquet(temp_path)
- )
+ logger.info(
+ "Creating profiles in %d chunks of up to %s households each (total: %s households x %d days)...",
+ n_chunks,
+ f"{chunk_size:,}",
+ f"{n_accounts:,}",
+ len(dates),
+ )
+ log_memory("before chunked streaming")
+
+ output_path.parent.mkdir(parents=True, exist_ok=True)
+ tmp_dir = output_path.parent
+
+ chunk_paths: list[Path] = []
+
+ for i in range(n_chunks):
+ start_idx = i * chunk_size
+ end_idx = min((i + 1) * chunk_size, n_accounts)
+ accounts_chunk = accounts[start_idx:end_idx]
- if not temp_path.exists() or temp_path.stat().st_size == 0:
- logger.warning(" No data found for selected households/dates in streaming mode")
- return 0
-
- # Pass 2: Sort, aggregate, and write final output
- logger.info(" Pass 2: Sorting and aggregating to daily profiles...")
-
- profiles_df = (
- pl.scan_parquet(temp_path)
- .sort(["account_identifier", "datetime"])
- .collect()
- .group_by(["account_identifier", "zip_code", "date"])
- .agg([
- pl.col("kwh").alias("profile"),
- pl.col("is_weekend").first(),
- pl.col("weekday").first(),
- pl.len().alias("num_intervals"),
- ])
- .filter(pl.col("num_intervals") == 48)
- .drop("num_intervals")
+ chunk_path = _create_profiles_for_chunk_streaming(
+ input_path=input_path,
+ accounts_chunk=accounts_chunk,
+ dates=dates,
+ chunk_idx=i,
+ total_chunks=n_chunks,
+ tmp_dir=tmp_dir,
)
- n_profiles = len(profiles_df)
- logger.info(f" Created {n_profiles:,} complete profiles")
+ # Only keep non-empty chunk files
+ if chunk_path.exists() and chunk_path.stat().st_size > 0:
+ chunk_paths.append(chunk_path)
+
+ gc.collect()
+
+ if not chunk_paths:
+ logger.warning("No profiles created in chunked streaming mode!")
+ return 0
- profiles_df.write_parquet(output_path)
- logger.info(f" Saved profiles (streaming) to {output_path}")
+ # Stream-concatenate all chunk parquet files
+ logger.info("Combining %d chunk files into %s", len(chunk_paths), output_path)
+ log_memory("before streaming concat")
- return n_profiles
+ combined_lf = pl.concat([pl.scan_parquet(p) for p in chunk_paths])
+ combined_lf.sink_parquet(output_path)
- finally:
- # Clean up temp file
- if temp_path.exists():
- temp_path.unlink(missing_ok=True)
+ log_memory("after streaming concat")
+
+ # Count rows in final output
+ n_profiles = pl.scan_parquet(output_path).select(pl.len()).collect()[0, 0]
+ logger.info(" Created %s complete profiles (chunked streaming)", f"{n_profiles:,}")
+ logger.info(" Saved to %s", output_path)
+
+ # Clean up temporary chunk files
+ for p in chunk_paths:
+ try:
+ p.unlink(missing_ok=True)
+ except OSError as exc:
+ logger.warning("Failed to delete temp chunk file %s: %s", p, exc)
+
+ return int(n_profiles)
# =============================================================================
@@ -381,6 +505,7 @@ def prepare_clustering_data(
sample_days: int = 20,
day_strategy: Literal["stratified", "random"] = "stratified",
streaming: bool = False,
+ chunk_size: int = 5000,
seed: int = 42,
) -> dict[str, Any]:
"""
@@ -392,7 +517,8 @@ def prepare_clustering_data(
sample_households: Number of households to sample (None = all).
sample_days: Number of days to sample.
day_strategy: 'stratified' (70% weekdays) or 'random'.
- streaming: If True, use streaming-friendly mode for large samples.
+ streaming: If True, use chunked streaming mode for large samples.
+ chunk_size: Households per chunk when streaming is enabled.
seed: Random seed for reproducibility.
Returns:
@@ -405,15 +531,16 @@ def prepare_clustering_data(
logger.info("=" * 70)
logger.info("PREPARING HOUSEHOLD-LEVEL CLUSTERING DATA")
if streaming:
- logger.info("(STREAMING MODE ENABLED)")
+ logger.info("(STREAMING MODE ENABLED, chunk_size=%d)", chunk_size)
logger.info("=" * 70)
+ log_memory("start of prepare_clustering_data")
# 1. Schema validation (cheap, no data load)
validation = validate_schema(input_path)
if not validation["valid"]:
raise ValueError(f"Input validation failed: {validation['errors']}")
- # 2. Metadata + sampling
+ # 2. Metadata + sampling (uses manifests for memory safety)
metadata = get_metadata_and_samples(
input_path=input_path,
sample_households=sample_households,
@@ -429,17 +556,20 @@ def prepare_clustering_data(
output_dir.mkdir(parents=True, exist_ok=True)
profiles_path = output_dir / "sampled_profiles.parquet"
- # 4. Create profiles (in-memory or streaming)
+ # 4. Create profiles (in-memory or chunked streaming)
if streaming:
- n_profiles = create_household_profiles_streaming(
+ n_profiles = create_household_profiles_chunked_streaming(
input_path=input_path,
accounts=accounts,
dates=dates,
output_path=profiles_path,
+ chunk_size=chunk_size,
)
if n_profiles == 0:
- raise ValueError("No profiles created in streaming mode - check input data.")
+ raise ValueError(
+ "No profiles created in chunked streaming mode - check input data and sampling settings.",
+ )
profiles_df = pl.read_parquet(profiles_path)
else:
@@ -450,25 +580,29 @@ def prepare_clustering_data(
)
if profiles_df.is_empty():
- raise ValueError("No profiles created - check input data and sampling settings.")
-
- (
- profiles_df.select([
- "account_identifier",
- "zip_code",
- "date",
- "profile",
- "is_weekend",
- "weekday",
- ]).write_parquet(profiles_path)
- )
- logger.info(f" Saved profiles: {profiles_path}")
+ raise ValueError(
+ "No profiles created - check input data and sampling settings.",
+ )
+
+ profiles_df.select([
+ "account_identifier",
+ "zip_code",
+ "date",
+ "profile",
+ "is_weekend",
+ "weekday",
+ ]).write_parquet(profiles_path)
+ logger.info(" Saved profiles: %s", profiles_path)
# 5. Save household → ZIP+4 mapping
household_map = profiles_df.select(["account_identifier", "zip_code"]).unique()
map_path = output_dir / "household_zip4_map.parquet"
household_map.write_parquet(map_path)
- logger.info(f" Saved household-ZIP+4 map: {map_path} ({len(household_map):,} households)")
+ logger.info(
+ " Saved household-ZIP+4 map: %s (%s households)",
+ map_path,
+ f"{len(household_map):,}",
+ )
# 6. Stats
stats = {
@@ -482,11 +616,12 @@ def prepare_clustering_data(
logger.info("=" * 70)
logger.info("CLUSTERING DATA READY")
logger.info("=" * 70)
- logger.info(f" Profiles: {stats['n_profiles']:,}")
- logger.info(f" Households: {stats['n_households']:,}")
- logger.info(f" ZIP+4s represented: {stats['n_zip4s']:,}")
- logger.info(f" Days: {stats['n_dates']}")
- logger.info(f" Output: {output_dir}")
+ logger.info(" Profiles: %s", f"{stats['n_profiles']:,}")
+ logger.info(" Households: %s", f"{stats['n_households']:,}")
+ logger.info(" ZIP+4s represented: %s", f"{stats['n_zip4s']:,}")
+ logger.info(" Days: %d", stats["n_dates"])
+ logger.info(" Output: %s", output_dir)
+ log_memory("end of prepare_clustering_data")
return stats
@@ -497,31 +632,33 @@ def main() -> int:
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
- # Standard run (5000 households, 20 days)
- python prepare_clustering_data_households.py \
- --input data/processed/comed_202308.parquet \
- --output-dir data/clustering \
- --sample-households 5000 \
+ # Standard run (5,000 households, 20 days)
+ python prepare_clustering_data_households.py \\
+ --input data/processed/comed_202308.parquet \\
+ --output-dir data/clustering \\
+ --sample-households 5000 \\
--sample-days 20
- # Large dataset with streaming
- python prepare_clustering_data_households.py \
- --input data/processed/comed_202308.parquet \
- --output-dir data/clustering \
- --sample-households 100000 \
- --streaming
+ # Large dataset with chunked streaming
+ python prepare_clustering_data_households.py \\
+ --input data/processed/comed_202308.parquet \\
+ --output-dir data/clustering \\
+ --sample-households 20000 \\
+ --sample-days 31 \\
+ --streaming \\
+ --chunk-size 2000
# All households, fewer days
- python prepare_clustering_data_households.py \
- --input data/processed/comed_202308.parquet \
- --output-dir data/clustering \
+ python prepare_clustering_data_households.py \\
+ --input data/processed/comed_202308.parquet \\
+ --output-dir data/clustering \\
--sample-days 10
# Quick test
- python prepare_clustering_data_households.py \
- --input data/processed/comed_202308.parquet \
- --output-dir data/clustering \
- --sample-households 500 \
+ python prepare_clustering_data_households.py \\
+ --input data/processed/comed_202308.parquet \\
+ --output-dir data/clustering \\
+ --sample-households 500 \\
--sample-days 5
""",
)
@@ -565,13 +702,19 @@ def main() -> int:
parser.add_argument(
"--streaming",
action="store_true",
- help="Use streaming-friendly mode for large household samples.",
+ help="Use chunked streaming mode for large household samples (10k+).",
+ )
+ parser.add_argument(
+ "--chunk-size",
+ type=int,
+ default=5000,
+ help="Households per chunk when --streaming is enabled (default: 5000).",
)
args = parser.parse_args()
if not args.input.exists():
- logger.error(f"Input file not found: {args.input}")
+ logger.error("Input file not found: %s", args.input)
return 1
try:
@@ -582,11 +725,12 @@ def main() -> int:
sample_days=args.sample_days,
day_strategy=args.day_strategy,
streaming=args.streaming,
+ chunk_size=args.chunk_size,
seed=args.seed,
)
return 0
- except Exception as e:
- logger.error(f"Failed: {e}")
+ except Exception as exc:
+ logger.error("Failed: %s", exc)
# Re-raise so stack traces are visible in logs when run via a pipeline
raise
diff --git a/analysis/clustering/stage2_blockgroup_regression.py b/analysis/clustering/stage2_blockgroup_regression.py
index d6c2a7a..ec64e7b 100644
--- a/analysis/clustering/stage2_blockgroup_regression.py
+++ b/analysis/clustering/stage2_blockgroup_regression.py
@@ -43,6 +43,10 @@
import numpy as np
import polars as pl
+import statsmodels.api as sm
+from sklearn.preprocessing import StandardScaler
+
+from smart_meter_analysis.census import fetch_census_data
logger = logging.getLogger(__name__)
logging.basicConfig(
@@ -50,18 +54,12 @@
format="%(asctime)s - %(levelname)s - %(message)s",
)
-# Default predictors
+# Default predictors (leaner set to help with convergence and interpretability)
DEFAULT_PREDICTORS = [
"Median_Household_Income",
"Median_Age",
- "Total_Households",
- "Owner_Occupied",
- "Renter_Occupied",
- "Heat_Electric",
- "Heat_Utility_Gas",
"Urban_Percent",
- "Built_2020_After",
- "Built_1939_Earlier",
+ "Total_Households",
]
@@ -75,7 +73,7 @@ def load_cluster_assignments(path: Path) -> tuple[pl.DataFrame, dict]:
Load household-day cluster assignments and aggregate to household level.
Each household is assigned to their DOMINANT cluster (most frequent).
- I track how confident this assignment is (what % of days fell into that cluster).
+ We also compute a dominance metric: fraction of sampled days in that cluster.
Returns
-------
@@ -91,7 +89,7 @@ def load_cluster_assignments(path: Path) -> tuple[pl.DataFrame, dict]:
confidence_stats : dict
Summary statistics on assignment confidence
"""
- logger.info(f"Loading cluster assignments from {path}")
+ logger.info("Loading cluster assignments from %s", path)
raw = pl.read_parquet(path)
required = ["account_identifier", "zip_code", "cluster"]
@@ -152,8 +150,13 @@ def load_cluster_assignments(path: Path) -> tuple[pl.DataFrame, dict]:
def load_crosswalk(crosswalk_path: Path, zip_codes: list[str]) -> pl.DataFrame:
- """Load ZIP+4 → Census block-group crosswalk."""
- logger.info(f"Loading crosswalk from {crosswalk_path}")
+ """Load ZIP+4 → Census block-group crosswalk for the ZIP+4s in our data.
+
+ Also runs a small diagnostic to detect fan-out (ZIP+4 mapping to
+ multiple block groups), which would imply potential over-counting
+ if we don't re-weight.
+ """
+ logger.info("Loading crosswalk from %s", crosswalk_path)
crosswalk = (
pl.scan_csv(crosswalk_path, separator="\t", infer_schema_length=10000)
@@ -174,6 +177,27 @@ def load_crosswalk(crosswalk_path: Path, zip_codes: list[str]) -> pl.DataFrame:
f"{len(set(zip_codes)):,}",
)
+ # Fan-out diagnostic: do any ZIP+4s map to multiple block groups?
+ if crosswalk.is_empty():
+ logger.warning(" Crosswalk is empty after filtering for sample ZIP+4s.")
+ return crosswalk
+
+ fanout = crosswalk.group_by("zip_code").agg(pl.n_unique("block_group_geoid").alias("n_block_groups"))
+ max_fanout = int(fanout["n_block_groups"].max())
+
+ if max_fanout > 1:
+ fanout_summary = fanout.group_by("n_block_groups").agg(pl.len().alias("n_zip4")).sort("n_block_groups")
+ logger.warning(
+ " WARNING: ZIP+4 → block-group crosswalk has fan-out for the "
+ "current sample (some ZIP+4s map to multiple block groups). "
+ "Distribution of mappings:\n%s",
+ fanout_summary,
+ )
+ else:
+ logger.info(
+ " Crosswalk is 1-to-1 for the current sample: each ZIP+4 maps to exactly one block group (no fan-out)."
+ )
+
return crosswalk
@@ -242,12 +266,6 @@ def fetch_or_load_census(
logger.info("Fetching Census data from API (state=%s, year=%s)...", state_fips, acs_year)
- try:
- from smart_meter_analysis.census import fetch_census_data
- except ImportError:
- logger.error("Could not import fetch_census_data from smart_meter_analysis.census.")
- raise
-
census_df = fetch_census_data(state_fips=state_fips, acs_year=acs_year)
cache_path.parent.mkdir(parents=True, exist_ok=True)
@@ -309,8 +327,8 @@ def prepare_regression_dataset(
logger.info(" After cluster diversity filter: %s block groups", f"{df['block_group_geoid'].n_unique():,}")
- # Filter predictors to those that exist and have data
- available_predictors = []
+ # Filter predictors to those that exist and have acceptable missingness
+ available_predictors: list[str] = []
for p in predictors:
if p not in df.columns:
logger.warning(" Predictor not found: %s", p)
@@ -346,19 +364,33 @@ def run_multinomial_regression(
predictors: list[str],
outcome: str = "cluster",
weight_col: str = "n_households",
+ standardize: bool = False,
) -> dict[str, object]:
- """Run multinomial logistic regression with statsmodels."""
+ """Run multinomial logistic regression with statsmodels.
+
+ Parameters
+ ----------
+ reg_df : pl.DataFrame
+ Long-form data, one row per (block_group_geoid, cluster) with
+ columns including predictors, outcome, and weights.
+ predictors : list[str]
+ Names of predictor columns.
+ outcome : str, default "cluster"
+ Name of the outcome column (cluster index).
+ weight_col : str, default "n_households"
+ Column providing frequency weights (households per BG x cluster).
+ standardize : bool, default False
+ If True, standardize predictors before regression. If False,
+ use raw units (easier to interpret coefficients and ORs).
+ """
logger.info("Running multinomial logistic regression...")
- import statsmodels.api as sm
- from sklearn.preprocessing import StandardScaler
-
# Extract arrays
X = reg_df.select(predictors).to_numpy().astype(np.float64)
y = reg_df.get_column(outcome).to_numpy()
weights = reg_df.get_column(weight_col).to_numpy().astype(np.float64)
- # Drop rows with NaN
+ # Drop rows with NaN in predictors
nan_mask = np.isnan(X).any(axis=1)
if nan_mask.sum() > 0:
logger.warning(" Dropping %s rows with NaN predictors", f"{nan_mask.sum():,}")
@@ -367,19 +399,32 @@ def run_multinomial_regression(
if len(X) == 0:
raise ValueError("No observations remaining after dropping NaN rows.")
+ # Count block groups with complete predictor data (for reporting)
+ # (This is approximate, but good enough for a summary line.)
n_block_groups = reg_df.filter(~pl.any_horizontal(pl.col(predictors).is_null()))["block_group_geoid"].n_unique()
- # Standardize and add intercept
- scaler = StandardScaler()
- X_scaled = scaler.fit_transform(X)
- X_with_const = sm.add_constant(X_scaled)
+ # Standardize or leave in raw units
+ if standardize:
+ logger.info(" Standardizing predictors (mean 0, sd 1)...")
+ scaler = StandardScaler()
+ X_transformed = scaler.fit_transform(X)
+ else:
+ logger.info(" Using raw predictor units (no standardization).")
+ X_transformed = X
- # Expand rows by weights for proper frequency weighting
+ # Add intercept
+ X_with_const = sm.add_constant(X_transformed)
+
+ # Expand rows by integer weights for frequency weighting
weight_ints = np.maximum(np.round(weights).astype(int), 1)
X_expanded = np.repeat(X_with_const, weight_ints, axis=0)
y_expanded = np.repeat(y, weight_ints)
- logger.info(" Training on %s expanded rows (%s block groups)", f"{len(X_expanded):,}", n_block_groups)
+ logger.info(
+ " Training on %s expanded rows (%s block groups)",
+ f"{len(X_expanded):,}",
+ n_block_groups,
+ )
model = sm.MNLogit(y_expanded, X_expanded)
result = model.fit(method="newton", maxiter=100, disp=False)
@@ -438,6 +483,9 @@ def generate_report(
"""Generate human-readable summary."""
conf = results.get("cluster_assignment_confidence", {})
+ n_households = conf.get("n_households")
+ households_line = " Households: N/A" if n_households is None else f" Households: {n_households:,}"
+
lines = [
"=" * 70,
"STAGE 2: BLOCK-GROUP MULTINOMIAL REGRESSION RESULTS",
@@ -459,7 +507,7 @@ def generate_report(
"Each household assigned to their most frequent (dominant) cluster.",
"Dominance = fraction of household's days in their assigned cluster.",
"",
- f" Households: {conf.get('n_households', 'N/A'):,}",
+ households_line,
f" Mean dominance: {conf.get('dominance_mean', 0) * 100:.1f}%",
f" Median dominance: {conf.get('dominance_median', 0) * 100:.1f}%",
f" Households >50% in one cluster: {conf.get('pct_above_50', 0):.1f}%",
@@ -523,22 +571,44 @@ def main() -> int:
parser.add_argument("--clusters", type=Path, required=True, help="cluster_assignments.parquet")
parser.add_argument("--crosswalk", type=Path, required=True, help="ZIP+4 → block-group crosswalk")
- parser.add_argument("--census-cache", type=Path, default=Path("data/reference/census_17_2023.parquet"))
- parser.add_argument("--fetch-census", action="store_true", help="Force re-fetch Census data")
+ parser.add_argument(
+ "--census-cache",
+ type=Path,
+ default=Path("data/reference/census_17_2023.parquet"),
+ )
+ parser.add_argument(
+ "--fetch-census",
+ action="store_true",
+ help="Force re-fetch Census data",
+ )
parser.add_argument("--state-fips", default="17")
parser.add_argument("--acs-year", type=int, default=2023)
parser.add_argument("--min-households-per-bg", type=int, default=10)
parser.add_argument("--min-nonzero-clusters-per-bg", type=int, default=2)
- parser.add_argument("--predictors", nargs="+", default=DEFAULT_PREDICTORS, help="Predictor columns")
- parser.add_argument("--output-dir", type=Path, default=Path("data/clustering/results/stage2_blockgroups"))
+ parser.add_argument(
+ "--predictors",
+ nargs="+",
+ default=DEFAULT_PREDICTORS,
+ help="Predictor columns",
+ )
+ parser.add_argument(
+ "--output-dir",
+ type=Path,
+ default=Path("data/clustering/results/stage2_blockgroups"),
+ )
+ parser.add_argument(
+ "--standardize",
+ action="store_true",
+ help="Standardize predictors before regression. Default is to use raw units (easier to interpret).",
+ )
args = parser.parse_args()
if not args.clusters.exists():
- logger.error(f"Cluster assignments not found: {args.clusters}")
+ logger.error("Cluster assignments not found: %s", args.clusters)
return 1
if not args.crosswalk.exists():
- logger.error(f"Crosswalk not found: {args.crosswalk}")
+ logger.error("Crosswalk not found: %s", args.crosswalk)
return 1
args.output_dir.mkdir(parents=True, exist_ok=True)
@@ -553,27 +623,42 @@ def main() -> int:
clusters_bg = attach_block_groups(clusters, crosswalk)
bg_counts = aggregate_blockgroup_cluster_counts(clusters_bg)
- census_df = fetch_or_load_census(args.census_cache, args.state_fips, args.acs_year, args.fetch_census)
- logger.info(" Census: %s block groups, %s columns", f"{len(census_df):,}", len(census_df.columns))
+ census_df = fetch_or_load_census(
+ cache_path=args.census_cache,
+ state_fips=args.state_fips,
+ acs_year=args.acs_year,
+ force_fetch=args.fetch_census,
+ )
+ logger.info(
+ " Census: %s block groups, %s columns",
+ f"{len(census_df):,}",
+ len(census_df.columns),
+ )
demo_df = attach_census_to_blockgroups(bg_counts, census_df)
reg_df, predictors = prepare_regression_dataset(
- demo_df,
- args.predictors,
- args.min_households_per_bg,
- args.min_nonzero_clusters_per_bg,
+ demo_df=demo_df,
+ predictors=args.predictors,
+ min_households_per_bg=args.min_households_per_bg,
+ min_nonzero_clusters_per_bg=args.min_nonzero_clusters_per_bg,
)
if reg_df.is_empty():
logger.error("No data after filtering")
return 1
- # Save dataset
+ # Save dataset used for regression
reg_df.write_parquet(args.output_dir / "regression_data_blockgroups.parquet")
# Run regression
- results = run_multinomial_regression(reg_df, predictors)
+ results = run_multinomial_regression(
+ reg_df=reg_df,
+ predictors=predictors,
+ outcome="cluster",
+ weight_col="n_households",
+ standardize=args.standardize,
+ )
# Add confidence stats to results
results["cluster_assignment_confidence"] = confidence_stats
diff --git a/docs/index.html b/docs/index.html
index 9d2f266..e7a3cf4 100644
--- a/docs/index.html
+++ b/docs/index.html
@@ -34,7 +34,7 @@
-
+
diff --git a/scripts/run_comed_pipeline.py b/scripts/run_comed_pipeline.py
index 255f5c7..f802b7e 100644
--- a/scripts/run_comed_pipeline.py
+++ b/scripts/run_comed_pipeline.py
@@ -3,7 +3,8 @@
ComEd Smart Meter Analysis Pipeline
Main entry point for the ComEd smart meter clustering analysis. This script
-handles the complete workflow from raw S3 data to clustered load profiles.
+handles the complete workflow from raw S3 data to clustered household-day
+load profiles and (optionally) Stage 2 block-group regression.
================================================================================
PIPELINE OVERVIEW
@@ -12,67 +13,83 @@
Stage 1: Usage Pattern Clustering (this script)
1. Download - Fetch CSV files from S3
2. Process - Transform wide→long, add time features, write parquet
- 3. Prepare - Create daily load profiles per HOUSEHOLD (not ZIP+4)
- 4. Cluster - K-means (Euclidean) to identify usage patterns
+ 3. Prepare - Create daily load profiles per HOUSEHOLD-DAY
+ (one 48-point profile per (household, date))
+ 4. Cluster - K-means (Euclidean) on household-day profiles
5. Validate - Check data quality at each step
Stage 2: Block-Group Demographic Regression (stage2_blockgroup_regression.py)
- - Aggregate households to block-group X cluster counts
+ - Aggregate household-day observations to (block_group x cluster) counts
- Join with Census demographics at block-group level
- Run multinomial logistic regression (statsmodels for proper inference)
- - Identify demographic predictors of cluster composition
+ - Identify demographic predictors of the mix of clusters across household-days
- Unit of analysis: ONE ROW PER (block_group, cluster)
+ Unit of analysis: ONE ROW PER (block_group, cluster), with counts over
+ household-day observations.
================================================================================
USAGE MODES
================================================================================
-FULL PIPELINE (download from S3 and run everything):
- python run_comed_pipeline.py --from-s3 --num-files 1000
-
-VALIDATE ONLY (check existing files):
- python run_comed_pipeline.py --validate-only
-
-PROCESS ONLY (no clustering, useful for testing):
- python run_comed_pipeline.py --from-s3 --num-files 100 --skip-clustering
+Typical workflow from the project root (script lives in scripts/):
+
+FULL PIPELINE (download from S3, process, cluster, validate, run Stage 2):
+ python scripts/run_comed_pipeline.py \
+ --from-s3 \
+ --year-month 202308 \
+ --num-files 10000 \
+ --sample-households 20000 \
+ --sample-days 31 \
+ --k-range 3 6 \
+ --run-stage2
+
+QUICK LOCAL TEST (fewer files, fewer households/days):
+ python scripts/run_comed_pipeline.py \
+ --from-s3 \
+ --year-month 202308 \
+ --num-files 100 \
+ --sample-households 2000 \
+ --sample-days 10 \
+ --k-range 3 4
+
+VALIDATE ONLY (check existing files for a given run):
+ python scripts/run_comed_pipeline.py \
+ --validate-only \
+ --run-name 202308_10000
SPECIFIC STAGE VALIDATION:
- python run_comed_pipeline.py --validate-only --stage processed
- python run_comed_pipeline.py --validate-only --stage clustering
+ python scripts/run_comed_pipeline.py \
+ --validate-only \
+ --run-name 202308_10000 \
+ --stage processed
+
+ python scripts/run_comed_pipeline.py \
+ --validate-only \
+ --run-name 202308_10000 \
+ --stage clustering
================================================================================
OUTPUT STRUCTURE
================================================================================
data/validation_runs/{run_name}/
-├── samples/ # Raw CSV files from S3
+├── samples/ # Raw CSV files from S3
├── processed/
-│ └── comed_{year_month}.parquet # Interval-level data (long format)
+│ └── comed_{year_month}.parquet # Interval-level data (long format)
├── clustering/
-│ ├── sampled_profiles.parquet # Daily load profiles for clustering
-│ ├── zip4_demographics.parquet # ZIP+4 list (demographics added in Stage 2)
+│ ├── sampled_profiles.parquet # Household-day profiles for clustering
+│ ├── household_zip4_map.parquet # Map of households to ZIP+4s
│ └── results/
│ ├── cluster_assignments.parquet
│ ├── cluster_centroids.parquet
-│ ├── clustering_metrics.json
-│ └── centroid_plot.png
-
-================================================================================
-EXAMPLES
-================================================================================
+│ ├── k_evaluation.json
+│ ├── clustering_metadata.json
+│ ├── elbow_curve.png
+│ ├── cluster_centroids.png
+│ ├── cluster_samples.png
+│ └── stage2_blockgroups/ # (optional) Stage 2 outputs when run
+│ ├── ... block-group cluster counts, regression results, etc.
-# Quick test with 100 files
-python run_comed_pipeline.py --from-s3 --num-files 100 --sample-households 500 --sample-days 10
-
-# Standard analysis (1000 files, ~30min clustering)
-python run_comed_pipeline.py --from-s3 --num-files 1000
-
-# Large scale (will take longer)
-python run_comed_pipeline.py --from-s3 --num-files 10000 --sample-households 10000
-
-# Just validate existing results
-python run_comed_pipeline.py --validate-only --run-name 202308_1000
"""
from __future__ import annotations
@@ -97,7 +114,6 @@
DEFAULT_PATHS = {
"processed": Path("data/processed/comed_202308.parquet"),
- "enriched": Path("data/enriched_test/enriched.parquet"),
"clustering_dir": Path("data/clustering"),
"crosswalk": Path("data/reference/2023_comed_zip4_census_crosswalk.txt"),
}
@@ -108,12 +124,16 @@
}
DEFAULT_CLUSTERING_CONFIG = {
- "sample_days": 20,
- "sample_households": 5000, # Number of households to sample (None = all)
- "k_min": 3,
- "k_max": 5,
+ # Household/day sampling for clustering
+ "sample_days": 31,
+ "sample_households": 20_000, # None = all; 20k is the tested high-volume default
"day_strategy": "stratified",
- "n_init": 10, # Number of k-means initializations
+ # K-means hyperparameters
+ "k_min": 3,
+ "k_max": 6,
+ "n_init": 10,
+ # Profile construction (streaming is always on)
+ "chunk_size": 500, # households per chunk when building profiles
}
@@ -127,11 +147,12 @@ class ComedPipeline:
Orchestrates the ComEd smart meter analysis pipeline.
This class manages the complete workflow from raw S3 data to clustered
- load profiles, with validation at each step.
+ household-day load profiles, with validation at each step and optional
+ Stage 2 regression.
Attributes:
base_dir: Project root directory
- run_name: Identifier for this pipeline run (e.g., "202308_1000")
+ run_name: Identifier for this pipeline run (e.g., "202308_10000")
run_dir: Output directory for this run
paths: Dictionary of file paths for this run
"""
@@ -173,7 +194,7 @@ def setup_directories(self) -> None:
for subdir in ["samples", "processed", "clustering/results"]:
(self.run_dir / subdir).mkdir(parents=True, exist_ok=True)
- logger.info(f"Created output directory: {self.run_dir}")
+ logger.info("Created output directory: %s", self.run_dir)
def download_from_s3(
self,
@@ -186,13 +207,13 @@ def download_from_s3(
Download CSV files from S3 for processing.
Args:
- year_month: Target month in YYYYMM format (e.g., '202308')
+ year_month: Target month in YYYYMM format (e.g., "202308")
num_files: Number of CSV files to download
bucket: S3 bucket name
prefix: S3 key prefix for ComEd data
Returns:
- True if download successful, False otherwise
+ True if download successful, False otherwise.
"""
try:
import boto3
@@ -200,7 +221,7 @@ def download_from_s3(
logger.error("boto3 not installed. Run: pip install boto3")
return False
- logger.info(f"Connecting to S3: s3://{bucket}/{prefix}{year_month}/")
+ logger.info("Connecting to S3: s3://%s/%s%s/", bucket, prefix, year_month)
try:
s3 = boto3.client("s3")
@@ -208,7 +229,7 @@ def download_from_s3(
# List files
paginator = s3.get_paginator("list_objects_v2")
- csv_keys = []
+ csv_keys: list[str] = []
for page in paginator.paginate(Bucket=bucket, Prefix=full_prefix):
if "Contents" not in page:
@@ -222,10 +243,10 @@ def download_from_s3(
break
if not csv_keys:
- logger.error(f"No CSV files found in s3://{bucket}/{full_prefix}")
+ logger.error("No CSV files found in s3://%s/%s", bucket, full_prefix)
return False
- logger.info(f"Downloading {len(csv_keys)} files to {self.paths['samples']}")
+ logger.info("Downloading %d files to %s", len(csv_keys), self.paths["samples"])
for i, key in enumerate(csv_keys, 1):
filename = Path(key).name
@@ -233,13 +254,13 @@ def download_from_s3(
s3.download_file(bucket, key, str(local_path))
if i % 100 == 0 or i == len(csv_keys):
- logger.info(f" Downloaded {i}/{len(csv_keys)} files")
+ logger.info(" Downloaded %d/%d files", i, len(csv_keys))
- logger.info(f"Download complete: {len(csv_keys)} files")
+ logger.info("Download complete: %d files", len(csv_keys))
return True
- except Exception as e:
- logger.error(f"S3 download failed: {e}")
+ except Exception as exc:
+ logger.error("S3 download failed: %s", exc)
return False
def process_raw_data(self, year_month: str) -> bool:
@@ -250,17 +271,17 @@ def process_raw_data(self, year_month: str) -> bool:
Uses lazy evaluation for memory efficiency.
Args:
- year_month: Month identifier for output file naming
+ year_month: Month identifier for output file naming.
Returns:
- True if processing successful, False otherwise
+ True if processing successful, False otherwise.
"""
csv_files = sorted(self.paths["samples"].glob("*.csv"))
if not csv_files:
- logger.error(f"No CSV files found in {self.paths['samples']}")
+ logger.error("No CSV files found in %s", self.paths["samples"])
return False
- logger.info(f"Processing {len(csv_files)} CSV files")
+ logger.info("Processing %d CSV files", len(csv_files))
from smart_meter_analysis.aws_loader import (
COMED_SCHEMA,
@@ -268,18 +289,18 @@ def process_raw_data(self, year_month: str) -> bool:
transform_wide_to_long_lazy,
)
- lazy_frames = []
+ lazy_frames: list[pl.LazyFrame] = []
for i, csv_path in enumerate(csv_files, 1):
if i % 200 == 0 or i == len(csv_files):
- logger.info(f" Scanned {i}/{len(csv_files)} files")
+ logger.info(" Scanned %d/%d files", i, len(csv_files))
try:
lf = pl.scan_csv(str(csv_path), schema_overrides=COMED_SCHEMA, ignore_errors=True)
lf = transform_wide_to_long_lazy(lf)
lf = add_time_columns_lazy(lf, day_mode="calendar")
lazy_frames.append(lf)
- except Exception as e:
- logger.warning(f"Failed to scan {csv_path.name}: {e}")
+ except Exception as exc:
+ logger.warning("Failed to scan %s: %s", csv_path.name, exc)
if not lazy_frames:
logger.error("No files successfully scanned")
@@ -292,7 +313,7 @@ def process_raw_data(self, year_month: str) -> bool:
lf_combined.sink_parquet(self.paths["processed"])
row_count = pl.scan_parquet(self.paths["processed"]).select(pl.len()).collect()[0, 0]
- logger.info(f"Wrote {row_count:,} records to {self.paths['processed']}")
+ logger.info("Wrote %s records to %s", f"{row_count:,}", self.paths["processed"])
return True
@@ -301,19 +322,22 @@ def prepare_clustering_data(
sample_days: int = DEFAULT_CLUSTERING_CONFIG["sample_days"],
sample_households: int | None = DEFAULT_CLUSTERING_CONFIG.get("sample_households"),
day_strategy: str = DEFAULT_CLUSTERING_CONFIG["day_strategy"],
+ chunk_size: int = DEFAULT_CLUSTERING_CONFIG["chunk_size"],
) -> bool:
"""
- Prepare daily load profiles for clustering at the household level.
+ Prepare daily household-day load profiles for clustering.
- Creates profiles for individual households.
+ Creates 48-interval profiles for individual household-day combinations
+ using the manifest-based, chunked streaming pipeline.
Args:
- sample_days: Number of days to sample
- sample_households: Number of households to sample (None = all)
- day_strategy: 'stratified' (70/30 weekday/weekend) or 'random'
+ sample_days: Number of days to sample per run.
+ sample_households: Number of households to sample (None = all).
+ day_strategy: "stratified" (70/30 weekday/weekend) or "random".
+ chunk_size: Households per chunk for the streaming profile builder.
Returns:
- True if preparation successful, False otherwise
+ True if preparation successful, False otherwise.
"""
import subprocess
@@ -321,7 +345,7 @@ def prepare_clustering_data(
output_dir = self.paths["clustering_dir"]
if not input_path.exists():
- logger.error(f"Processed data not found: {input_path}")
+ logger.error("Processed data not found: %s", input_path)
return False
cmd = [
@@ -335,18 +359,25 @@ def prepare_clustering_data(
day_strategy,
"--sample-days",
str(sample_days),
+ "--streaming", # streaming is always enabled
+ "--chunk-size",
+ str(chunk_size),
]
if sample_households:
cmd.extend(["--sample-households", str(sample_households)])
logger.info(
- f"Preparing household clustering data ({sample_households or 'all'} households X {sample_days} days)"
+ "Preparing household-day clustering data (%s households x %d days; streaming, chunk_size=%d)",
+ sample_households or "all",
+ sample_days,
+ chunk_size,
)
+
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
- logger.error(f"Clustering prep failed: {result.stderr}")
+ logger.error("Clustering prep failed: %s", result.stderr)
return False
logger.info("Clustering data prepared")
@@ -359,18 +390,18 @@ def run_clustering(
n_init: int = DEFAULT_CLUSTERING_CONFIG["n_init"],
) -> bool:
"""
- Run k-means clustering on prepared profiles.
+ Run k-means clustering on prepared household-day profiles.
Uses Euclidean distance since all profiles are aligned to the same
time grid (no time warping needed).
Args:
- k_min: Minimum number of clusters to test
- k_max: Maximum number of clusters to test
- n_init: Number of k-means initializations
+ k_min: Minimum number of clusters to test.
+ k_max: Maximum number of clusters to test.
+ n_init: Number of k-means initializations.
Returns:
- True if clustering successful, False otherwise
+ True if clustering successful, False otherwise.
"""
import subprocess
@@ -379,7 +410,7 @@ def run_clustering(
results_dir.mkdir(parents=True, exist_ok=True)
if not profiles_path.exists():
- logger.error(f"Profiles not found: {profiles_path}")
+ logger.error("Profiles not found: %s", profiles_path)
return False
cmd = [
@@ -398,13 +429,13 @@ def run_clustering(
str(n_init),
]
- logger.info(f"Running k-means clustering (k={k_min}-{k_max})...")
+ logger.info("Running k-means clustering (k=%d-%d)...", k_min, k_max)
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
- logger.error(f"Clustering failed: {result.stderr}")
+ logger.error("Clustering failed: %s", result.stderr)
if result.stdout:
- logger.error(f"stdout: {result.stdout}")
+ logger.error("stdout: %s", result.stdout)
return False
logger.info("Clustering complete")
@@ -419,19 +450,20 @@ def run_stage2_regression(
Run Stage 2: Block-group-level regression of cluster composition.
Models how Census block-group demographics are associated with the
- composition of households across load-profile clusters.
+ composition of household-day profiles across clusters.
- Unit of analysis: ONE ROW PER (block_group, cluster).
+ Unit of analysis: ONE ROW PER (block_group, cluster), with counts over
+ household-day observations.
stage2_blockgroup_regression.py handles census fetching internally
- if cache doesn't exist.
+ if cache does not exist.
Args:
- crosswalk_path: Path to ZIP+4 → block-group crosswalk file
- census_cache_path: Path to cached census data
+ crosswalk_path: Path to ZIP+4 → block-group crosswalk file.
+ census_cache_path: Path to cached census data.
Returns:
- True if regression successful, False otherwise
+ True if regression successful, False otherwise.
"""
import subprocess
@@ -439,7 +471,7 @@ def run_stage2_regression(
output_dir = self.paths["clustering_dir"] / "results" / "stage2_blockgroups"
if not clusters_path.exists():
- logger.error(f"Cluster assignments not found: {clusters_path}")
+ logger.error("Cluster assignments not found: %s", clusters_path)
logger.error("Run Stage 1 clustering first")
return False
@@ -450,10 +482,9 @@ def run_stage2_regression(
census_cache_path = self.base_dir / "data" / "reference" / "census_17_2023.parquet"
if not crosswalk_path.exists():
- logger.error(f"Crosswalk not found: {crosswalk_path}")
+ logger.error("Crosswalk not found: %s", crosswalk_path)
return False
- # stage2_blockgroup_regression.py handles census fetching if cache missing
cmd = [
sys.executable,
str(self.base_dir / "analysis" / "clustering" / "stage2_blockgroup_regression.py"),
@@ -470,15 +501,15 @@ def run_stage2_regression(
logger.info("Running Stage 2 block-group regression...")
result = subprocess.run(cmd, capture_output=True, text=True)
- # Print output (report goes to stdout)
+ # Print report (stage2 script writes a human-readable summary to stdout)
if result.stdout:
print(result.stdout)
if result.returncode != 0:
- logger.error(f"Stage 2 regression failed: {result.stderr}")
+ logger.error("Stage 2 regression failed: %s", result.stderr)
return False
- logger.info(f"Stage 2 complete: {output_dir}")
+ logger.info("Stage 2 complete: %s", output_dir)
return True
# =========================================================================
@@ -492,16 +523,16 @@ def validate_processed_data(self) -> dict:
if not path.exists():
return self._fail("processed", f"File not found: {path}")
- logger.info(f"Validating processed data: {path}")
+ logger.info("Validating processed data: %s", path)
- errors = []
- warnings = []
+ errors: list[str] = []
+ warnings: list[str] = []
try:
lf = pl.scan_parquet(path)
schema = lf.collect_schema()
- except Exception as e:
- return self._fail("processed", f"Failed to read: {e}")
+ except Exception as exc:
+ return self._fail("processed", f"Failed to read: {exc}")
# Check required columns
required = ["zip_code", "account_identifier", "datetime", "kwh", "date", "hour"]
@@ -511,13 +542,15 @@ def validate_processed_data(self) -> dict:
# Get stats using lazy evaluation (no full load)
try:
- stats_df = lf.select([
- pl.len().alias("rows"),
- pl.col("zip_code").n_unique().alias("zip_codes"),
- pl.col("account_identifier").n_unique().alias("accounts"),
- pl.col("kwh").min().alias("kwh_min"),
- pl.col("kwh").null_count().alias("kwh_nulls"),
- ]).collect()
+ stats_df = lf.select(
+ [
+ pl.len().alias("rows"),
+ pl.col("zip_code").n_unique().alias("zip_codes"),
+ pl.col("account_identifier").n_unique().alias("accounts"),
+ pl.col("kwh").min().alias("kwh_min"),
+ pl.col("kwh").null_count().alias("kwh_nulls"),
+ ],
+ ).collect()
stats_dict = stats_df.to_dicts()[0]
@@ -543,27 +576,27 @@ def validate_processed_data(self) -> dict:
"accounts": stats_dict["accounts"],
"file_size_mb": path.stat().st_size / 1024 / 1024,
}
- except Exception as e:
- return self._fail("processed", f"Failed to compute stats: {e}")
+ except Exception as exc:
+ return self._fail("processed", f"Failed to compute stats: {exc}")
return self._result("processed", errors, warnings, stats)
def validate_clustering_inputs(self) -> dict:
- """Validate clustering input files."""
+ """Validate clustering input files (household-day profiles)."""
profiles_path = self.paths["clustering_dir"] / "sampled_profiles.parquet"
if not profiles_path.exists():
return self._fail("clustering_inputs", f"Profiles not found: {profiles_path}")
- logger.info(f"Validating clustering inputs: {profiles_path}")
+ logger.info("Validating clustering inputs: %s", profiles_path)
try:
df = pl.read_parquet(profiles_path)
- except Exception as e:
- return self._fail("clustering_inputs", f"Failed to read: {e}")
+ except Exception as exc:
+ return self._fail("clustering_inputs", f"Failed to read: {exc}")
- errors = []
- warnings = []
+ errors: list[str] = []
+ warnings: list[str] = []
# Check required columns
required = ["zip_code", "date", "profile"]
@@ -595,15 +628,15 @@ def validate_clustering_outputs(self) -> dict:
if not assignments_path.exists():
return self._skip("clustering_outputs", "No clustering results yet")
- logger.info(f"Validating clustering outputs: {results_dir}")
+ logger.info("Validating clustering outputs: %s", results_dir)
try:
assignments = pl.read_parquet(assignments_path)
- except Exception as e:
- return self._fail("clustering_outputs", f"Failed to read: {e}")
+ except Exception as exc:
+ return self._fail("clustering_outputs", f"Failed to read: {exc}")
- errors = []
- warnings = []
+ errors: list[str] = []
+ warnings: list[str] = []
# Check required columns
if "cluster" not in assignments.columns:
@@ -612,8 +645,6 @@ def validate_clustering_outputs(self) -> dict:
# Check cluster distribution
if "cluster" in assignments.columns:
cluster_counts = assignments["cluster"].value_counts()
-
- # Check for empty clusters
if cluster_counts["count"].min() == 0:
warnings.append("Some clusters have no assignments")
@@ -644,9 +675,11 @@ def run_full_pipeline(
num_files: int,
sample_days: int = DEFAULT_CLUSTERING_CONFIG["sample_days"],
sample_households: int | None = DEFAULT_CLUSTERING_CONFIG["sample_households"],
+ day_strategy: str = DEFAULT_CLUSTERING_CONFIG["day_strategy"],
k_min: int = DEFAULT_CLUSTERING_CONFIG["k_min"],
k_max: int = DEFAULT_CLUSTERING_CONFIG["k_max"],
n_init: int = DEFAULT_CLUSTERING_CONFIG["n_init"],
+ chunk_size: int = DEFAULT_CLUSTERING_CONFIG["chunk_size"],
skip_clustering: bool = False,
run_stage2: bool = False,
) -> bool:
@@ -654,18 +687,20 @@ def run_full_pipeline(
Execute the complete pipeline.
Args:
- year_month: Target month (YYYYMM format)
- num_files: Number of S3 files to download
- sample_days: Days to sample for clustering
- sample_households: Households to sample (None = all)
- k_min: Minimum clusters to test
- k_max: Maximum clusters to test
- n_init: Number of k-means initializations
- skip_clustering: If True, stop after preparing data
- run_stage2: If True, run demographic regression after clustering
+ year_month: Target month (YYYYMM format).
+ num_files: Number of S3 files to download.
+ sample_days: Days to sample for clustering.
+ sample_households: Households to sample (None = all).
+ day_strategy: Day sampling strategy ("stratified" or "random").
+ k_min: Minimum clusters to test.
+ k_max: Maximum clusters to test.
+ n_init: Number of k-means initializations.
+ chunk_size: Households per chunk for streaming profile builder.
+ skip_clustering: If True, stop after preparing data.
+ run_stage2: If True, run demographic regression after clustering.
Returns:
- True if all steps succeed, False otherwise
+ True if all steps succeed, False otherwise.
"""
self._print_header("COMED PIPELINE EXECUTION")
print(f"Year-Month: {year_month}")
@@ -673,6 +708,7 @@ def run_full_pipeline(
print(f"Output: {self.run_dir}")
print(f"Clustering: {'Skipped' if skip_clustering else f'k={k_min}-{k_max}'}")
print(f"Stage 2: {'Yes' if run_stage2 else 'No'}")
+ print(f"Profiles: streaming (chunk_size={chunk_size})")
self.setup_directories()
@@ -686,9 +722,14 @@ def run_full_pipeline(
if not self.process_raw_data(year_month):
return False
- # Step 3: Prepare clustering
+ # Step 3: Prepare clustering data
self._print_step("PREPARING CLUSTERING DATA")
- if not self.prepare_clustering_data(sample_days, sample_households):
+ if not self.prepare_clustering_data(
+ sample_days=sample_days,
+ sample_households=sample_households,
+ day_strategy=day_strategy,
+ chunk_size=chunk_size,
+ ):
return False
# Step 4: Cluster (optional)
@@ -715,7 +756,7 @@ def validate_all(self) -> bool:
Run all validation checks.
Returns:
- True if all critical validations pass, False otherwise
+ True if all critical validations pass, False otherwise.
"""
self._print_header("VALIDATION")
@@ -733,7 +774,7 @@ def validate_stage(self, stage: str) -> bool:
self.results["clustering_inputs"] = self.validate_clustering_inputs()
self.results["clustering_outputs"] = self.validate_clustering_outputs()
else:
- logger.error(f"Unknown stage: {stage}")
+ logger.error("Unknown stage: %s", stage)
return False
return self._print_summary()
@@ -752,15 +793,15 @@ def _print_step(self, title: str) -> None:
print(title)
print(f"{'─' * 70}")
- def _result(self, stage: str, errors: list, warnings: list, stats: dict) -> dict:
+ def _result(self, stage: str, errors: list[str], warnings: list[str], stats: dict) -> dict:
status = "PASS" if not errors else "FAIL"
icon = "✅" if status == "PASS" else "❌"
print(f"\n{icon} {stage.upper()}: {status}")
- for e in errors:
- print(f" Error: {e}")
- for w in warnings:
- print(f" ⚠️ {w}")
+ for err in errors:
+ print(f" Error: {err}")
+ for warn in warnings:
+ print(f" ⚠️ {warn}")
return {"status": status, "errors": errors, "warnings": warnings, "stats": stats}
@@ -790,7 +831,7 @@ def _print_summary(self) -> bool:
if stats.get("k"):
print("\nClustering Results:")
print(f" • {stats.get('n_assigned', '?')} profiles → {stats.get('k')} clusters")
- if stats.get("silhouette"):
+ if stats.get("silhouette") is not None:
print(f" • Silhouette score: {stats['silhouette']:.3f}")
print()
@@ -802,24 +843,35 @@ def _print_summary(self) -> bool:
# =============================================================================
-def main():
+def main() -> None:
parser = argparse.ArgumentParser(
description="ComEd Smart Meter Analysis Pipeline",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
- # Run full pipeline with 1000 files
- python run_comed_pipeline.py --from-s3 --num-files 1000
-
- # Quick test with 100 files, skip clustering
- python run_comed_pipeline.py --from-s3 --num-files 100 --skip-clustering
-
- # Validate existing results
- python run_comed_pipeline.py --validate-only --run-name 202308_1000
-
- # Custom clustering parameters
- python run_comed_pipeline.py --from-s3 --num-files 1000 \\
- --sample-households 2000 --sample-days 15 --k-range 3 5
+ # Quick local test (100 files, fewer households/days, no Stage 2)
+ python scripts/run_comed_pipeline.py \\
+ --from-s3 \\
+ --year-month 202308 \\
+ --num-files 100 \\
+ --sample-households 2000 \\
+ --sample-days 10 \\
+ --k-range 3 4
+
+ # High-volume analysis (tested configuration, with Stage 2)
+ python scripts/run_comed_pipeline.py \\
+ --from-s3 \\
+ --year-month 202308 \\
+ --num-files 10000 \\
+ --sample-households 20000 \\
+ --sample-days 31 \\
+ --k-range 3 6 \\
+ --run-stage2
+
+ # Validate existing results for a specific run
+ python scripts/run_comed_pipeline.py \\
+ --validate-only \\
+ --run-name 202308_10000
""",
)
@@ -866,7 +918,7 @@ def main():
"--sample-households",
type=int,
default=DEFAULT_CLUSTERING_CONFIG["sample_households"],
- help=f"Households to sample (default: {DEFAULT_CLUSTERING_CONFIG['sample_households']}, use 0 for all)",
+ help=(f"Households to sample (default: {DEFAULT_CLUSTERING_CONFIG['sample_households']}, use 0 for all)"),
)
cluster_group.add_argument(
"--sample-days",
@@ -880,16 +932,17 @@ def main():
nargs=2,
metavar=("MIN", "MAX"),
default=[DEFAULT_CLUSTERING_CONFIG["k_min"], DEFAULT_CLUSTERING_CONFIG["k_max"]],
- help=f"Cluster range to test (default: {DEFAULT_CLUSTERING_CONFIG['k_min']} {DEFAULT_CLUSTERING_CONFIG['k_max']})",
+ help=(
+ "Cluster range to test "
+ f"(default: {DEFAULT_CLUSTERING_CONFIG['k_min']} {DEFAULT_CLUSTERING_CONFIG['k_max']})"
+ ),
)
cluster_group.add_argument(
"--day-strategy",
choices=["stratified", "random"],
default=DEFAULT_CLUSTERING_CONFIG["day_strategy"],
- help="Day sampling strategy (default: stratified = 70%% weekday, 30%% weekend)",
+ help="Day sampling strategy (default: stratified = 70% weekday, 30% weekend)",
)
-
- # Clustering options
cluster_group.add_argument(
"--n-init",
type=int,
@@ -901,6 +954,14 @@ def main():
action="store_true",
help="Fast mode: k=3-4 (for testing)",
)
+ cluster_group.add_argument(
+ "--chunk-size",
+ type=int,
+ default=DEFAULT_CLUSTERING_CONFIG["chunk_size"],
+ help=(
+ f"Households per chunk for streaming profile builder (default: {DEFAULT_CLUSTERING_CONFIG['chunk_size']})"
+ ),
+ )
# Output options
output_group = parser.add_argument_group("Output Options")
@@ -944,17 +1005,19 @@ def main():
num_files=args.num_files,
sample_days=args.sample_days,
sample_households=sample_households,
+ day_strategy=args.day_strategy,
k_min=args.k_range[0],
k_max=args.k_range[1],
n_init=args.n_init,
+ chunk_size=args.chunk_size,
skip_clustering=args.skip_clustering,
run_stage2=args.run_stage2,
)
+
if success:
pipeline.validate_all()
elif args.validate_only:
success = pipeline.validate_all() if args.stage == "all" else pipeline.validate_stage(args.stage)
-
else:
parser.print_help()
print("\n⚠️ Specify --from-s3 to run pipeline or --validate-only to check existing files")
diff --git a/smart_meter_analysis/manifests.py b/smart_meter_analysis/manifests.py
new file mode 100644
index 0000000..fd6da8d
--- /dev/null
+++ b/smart_meter_analysis/manifests.py
@@ -0,0 +1,308 @@
+"""
+Manifest utilities for memory-safe processing of large parquet files.
+
+These are small parquet files containing unique values extracted from
+large source files using streaming (sink_parquet). This avoids OOM when
+calling unique().collect() on files with hundreds of millions of rows.
+
+Usage:
+ from smart_meter_analysis.manifests import (
+ ensure_account_manifest,
+ ensure_date_manifest,
+ load_account_sample,
+ load_date_sample,
+ )
+
+ # First call builds manifest via streaming (may take a few minutes)
+ # Subsequent calls return instantly from cache
+ account_manifest = ensure_account_manifest(
+ Path("data/processed/comed_202308.parquet")
+ )
+ accounts_df = pl.read_parquet(account_manifest)
+"""
+
+from __future__ import annotations
+
+import logging
+from pathlib import Path
+from typing import Any, Literal
+
+import polars as pl
+
+logger = logging.getLogger(__name__)
+
+
+# =============================================================================
+# Helpers
+# =============================================================================
+
+
+def log_memory(label: str) -> None:
+ """Log current RSS memory usage (Linux only)."""
+ try:
+ with open("/proc/self/status") as f:
+ for line in f:
+ if line.startswith("VmRSS:"):
+ mem_mb = int(line.split()[1]) / 1024
+ logger.debug("[MEMORY] %s: %.0f MB", label, mem_mb)
+ break
+ except FileNotFoundError as exc:
+ # Likely not on Linux; /proc may not exist
+ logger.debug("Skipping memory logging (%s): %s", type(exc).__name__, exc)
+ except OSError as exc:
+ # Permission issues or other OS-level problems
+ logger.debug("Could not read /proc/self/status for memory logging: %s", exc)
+
+
+def _validate_input_has_columns(input_path: Path, required: list[str]) -> None:
+ """Ensure the input parquet exists and has the required columns."""
+ if not input_path.exists():
+ raise FileNotFoundError(f"Input parquet not found: {input_path}")
+
+ schema = pl.scan_parquet(input_path).collect_schema()
+ missing = [c for c in required if c not in schema.names()]
+ if missing:
+ raise ValueError(f"Input parquet {input_path} missing required columns: {missing}")
+
+
+# =============================================================================
+# Manifest creation
+# =============================================================================
+
+
+def ensure_account_manifest(input_path: Path) -> Path:
+ """
+ Create or load account manifest in a memory-safe way.
+
+ The manifest contains one row per account_identifier with an associated
+ zip_code. Memory cost scales with the number of accounts, not the number
+ of interval-level rows.
+ """
+ _validate_input_has_columns(input_path, ["account_identifier", "zip_code"])
+
+ manifest_path = input_path.parent / f"{input_path.stem}_accounts.parquet"
+
+ # Check for existing valid manifest
+ if manifest_path.exists():
+ try:
+ n = pl.scan_parquet(manifest_path).select(pl.len()).collect()[0, 0]
+ if n > 0:
+ logger.info(
+ "Using existing account manifest: %s (%s accounts)",
+ manifest_path,
+ f"{n:,}",
+ )
+ return manifest_path
+ except Exception:
+ logger.warning(
+ "Account manifest is corrupt or unreadable, rebuilding: %s",
+ manifest_path,
+ )
+
+ # Build manifest using streaming-friendly group_by
+ logger.info("Building account manifest from %s (streaming group_by)...", input_path)
+ log_memory("before account manifest")
+
+ lf = pl.scan_parquet(input_path)
+
+ # One row per account_identifier, keep a representative zip_code.
+ # group_by + agg with collect(streaming=True) is streaming-safe; we only
+ # hold ~#accounts rows in memory.
+ accounts_df = (
+ lf.select(["account_identifier", "zip_code"])
+ .filter(pl.col("account_identifier").is_not_null())
+ .group_by("account_identifier")
+ .agg(pl.first("zip_code").alias("zip_code"))
+ .collect(engine="streaming")
+ .sort("account_identifier")
+ )
+
+ manifest_path.parent.mkdir(parents=True, exist_ok=True)
+ accounts_df.write_parquet(manifest_path)
+
+ log_memory("after account manifest")
+
+ n = accounts_df.height
+ logger.info(
+ "Account manifest complete (rebuilt): %s (%s accounts)",
+ manifest_path,
+ f"{n:,}",
+ )
+
+ return manifest_path
+
+
+def ensure_date_manifest(input_path: Path) -> Path:
+ """
+ Create or load date manifest in a memory-safe way.
+
+ The manifest contains one row per date with representative weekend/weekday
+ flags. This is small in practice (~31 rows for one month), but we still
+ build it via streaming group_by for consistency.
+ """
+ _validate_input_has_columns(input_path, ["date", "is_weekend", "weekday"])
+
+ manifest_path = input_path.parent / f"{input_path.stem}_dates.parquet"
+
+ # Check for existing valid manifest
+ if manifest_path.exists():
+ try:
+ n = pl.scan_parquet(manifest_path).select(pl.len()).collect()[0, 0]
+ if n > 0:
+ logger.info(
+ "Using existing date manifest: %s (%s dates)",
+ manifest_path,
+ f"{n:,}",
+ )
+ return manifest_path
+ except Exception:
+ logger.warning(
+ "Date manifest is corrupt or unreadable, rebuilding: %s",
+ manifest_path,
+ )
+
+ # Build manifest using streaming-friendly group_by
+ logger.info("Building date manifest from %s (streaming group_by)...", input_path)
+ log_memory("before date manifest")
+
+ lf = pl.scan_parquet(input_path)
+
+ dates_df = (
+ lf.select(["date", "is_weekend", "weekday"])
+ .filter(pl.col("date").is_not_null())
+ .group_by("date")
+ .agg(
+ pl.first("is_weekend").alias("is_weekend"),
+ pl.first("weekday").alias("weekday"),
+ )
+ .collect(engine="streaming")
+ .sort("date")
+ )
+
+ manifest_path.parent.mkdir(parents=True, exist_ok=True)
+ dates_df.write_parquet(manifest_path)
+
+ log_memory("after date manifest")
+
+ n = dates_df.height
+ logger.info(
+ "Date manifest complete (rebuilt): %s (%s dates)",
+ manifest_path,
+ f"{n:,}",
+ )
+
+ return manifest_path
+
+
+# =============================================================================
+# Sampling helpers
+# =============================================================================
+
+
+def load_account_sample(
+ manifest_path: Path,
+ sample_n: int | None = None,
+ seed: int = 42,
+) -> list[str]:
+ """
+ Load account identifiers from manifest, optionally sampling.
+
+ Args:
+ manifest_path:
+ Path to account manifest parquet.
+ sample_n:
+ Number of accounts to sample (None = all).
+ seed:
+ Random seed for reproducible sampling.
+
+ Returns:
+ List of account_identifier strings.
+ """
+ df = pl.read_parquet(manifest_path)
+
+ if df.is_empty():
+ logger.warning("Account manifest %s is empty; returning no accounts", manifest_path)
+ return []
+
+ if sample_n is not None and sample_n < len(df):
+ df = df.sample(n=sample_n, shuffle=True, seed=seed)
+ logger.info("Sampled %s accounts from manifest", f"{len(df):,}")
+ else:
+ logger.info("Using all %s accounts from manifest", f"{len(df):,}")
+
+ return df["account_identifier"].to_list()
+
+
+def load_date_sample(
+ manifest_path: Path,
+ sample_n: int,
+ strategy: Literal["stratified", "random"] = "stratified",
+ seed: int = 42,
+) -> list[Any]:
+ """
+ Load dates from manifest with stratified or random sampling.
+
+ Args:
+ manifest_path:
+ Path to date manifest parquet.
+ sample_n:
+ Number of dates to sample.
+ strategy:
+ "stratified" (70% weekday, 30% weekend) or "random".
+ seed:
+ Random seed for reproducible sampling.
+
+ Returns:
+ List of date values (type depends on input schema).
+ """
+ df = pl.read_parquet(manifest_path)
+
+ if df.is_empty():
+ logger.warning("Date manifest %s is empty; returning no dates", manifest_path)
+ return []
+
+ if sample_n <= 0:
+ logger.info("Requested sample_n <= 0; returning empty date list")
+ return []
+
+ # Stratified sampling if both weekday and weekend exist
+ if strategy == "stratified":
+ weekday_df = df.filter(~pl.col("is_weekend"))
+ weekend_df = df.filter(pl.col("is_weekend"))
+
+ if weekday_df.height == 0 or weekend_df.height == 0:
+ logger.warning("Missing weekdays or weekends in manifest; falling back to random sampling")
+ strategy = "random"
+
+ if strategy == "stratified":
+ n_weekdays = int(sample_n * 0.7)
+ n_weekends = sample_n - n_weekdays
+
+ n_weekdays = min(n_weekdays, len(weekday_df))
+ n_weekends = min(n_weekends, len(weekend_df))
+
+ sampled_weekdays = (
+ weekday_df.sample(n=n_weekdays, shuffle=True, seed=seed)["date"].to_list() if n_weekdays > 0 else []
+ )
+ sampled_weekends = (
+ weekend_df.sample(
+ n=n_weekends,
+ shuffle=True,
+ seed=seed + 1,
+ )["date"].to_list()
+ if n_weekends > 0
+ else []
+ )
+
+ dates = sampled_weekdays + sampled_weekends
+ logger.info(
+ "Sampled %d weekdays + %d weekend days (stratified)",
+ len(sampled_weekdays),
+ len(sampled_weekends),
+ )
+ else:
+ n_sample = min(sample_n, len(df))
+ dates = df.sample(n=n_sample, shuffle=True, seed=seed)["date"].to_list()
+ logger.info("Sampled %d dates (random)", len(dates))
+
+ return dates