diff --git a/arbalister/arrow.py b/arbalister/arrow.py index b4b7c3e..377308f 100644 --- a/arbalister/arrow.py +++ b/arbalister/arrow.py @@ -1,5 +1,6 @@ import codecs import pathlib +from pathlib import Path from typing import Any, Callable import datafusion as dn @@ -153,3 +154,37 @@ def write_csv( out = adbc.write_sqlite return out + +def get_parquet_column_stats(path:str | Path)->list[dict[str, Any]]: + """Get parquet column stats.""" + import pyarrow.parquet as pq + + if isinstance(path, Path): + path = str(path) + + file = pq.ParquetFile(path) + metadata = file.metadata + schema = file.schema + columns_stats = [] + for i, field in enumerate(schema): + min_val = None + max_val = None + null_count = 0 + + for row_group_index in range(metadata.num_row_groups): + row = metadata.row_group(row_group_index) + col_chunk = row.column(i) + stats = col_chunk.statistics + if stats: + if min_val is None or stats.min < min_val: + min_val = stats.min + if max_val is None or stats.max < max_val: + max_val = stats.max + null_count +=stats.null_count if stats.null_count is not None else 0 + columns_stats.append({ + "name":field.name, + "min":min_val, + "max": max_val, + "null_count": null_count + }) + return columns_stats diff --git a/arbalister/routes.py b/arbalister/routes.py index 009df18..86a80b1 100644 --- a/arbalister/routes.py +++ b/arbalister/routes.py @@ -129,6 +129,16 @@ class SchemaInfo: encoding: str = "base64" +@dataclasses.dataclass(frozen=True, slots=True) +class ColumnStats: + """Column stats for a parquet file.""" + + name: str + min: object | None = None + max: object | None = None + null_count: int | None = None + + @dataclasses.dataclass(frozen=True, slots=True) class StatsResponse: """File statistics returned in the stats route.""" @@ -136,6 +146,7 @@ class StatsResponse: schema: SchemaInfo num_rows: int = 0 num_cols: int = 0 + columns_stats: list[ColumnStats] | None = None class StatsRouteHandler(BaseRouteHandler): @@ -144,6 +155,13 @@ class StatsRouteHandler(BaseRouteHandler): @tornado.web.authenticated async def get(self, path: str) -> None: """HTTP GET return statistics.""" + file = self.data_file(path) + file_format = ff.FileFormat.from_filename(file) + + columns_stats = None + if file_format == ff.FileFormat.Parquet: + parquet_columns_stats = abw.get_parquet_column_stats(file) + columns_stats = [ColumnStats(**stats) for stats in parquet_columns_stats] df = self.dataframe(path) # FIXME this is not optimal for ORC/CSV where we can read_metadata, but it is not read @@ -175,6 +193,7 @@ async def get(self, path: str) -> None: num_cols=len(schema), num_rows=num_rows, schema=SchemaInfo(data=schema_64), + columns_stats=columns_stats, ) await self.finish(dataclasses.asdict(response)) @@ -184,6 +203,7 @@ class SqliteFileInfo: """Sqlite specific information about a file.""" table_names: list[str] + size_bytes: int | None = None @dataclasses.dataclass(frozen=True, slots=True) @@ -191,6 +211,7 @@ class CsvFileInfo: """Csv specific information about a file.""" delimiters: list[str] = dataclasses.field(default_factory=lambda: [",", ";", "\\t", "|", "#"]) + size_bytes: int | None = None FileInfo = SqliteFileInfo | CsvFileInfo @@ -202,6 +223,7 @@ class FileInfoResponse[I, P]: info: I default_options: P + size_bytes: int | None = None CsvFileInfoResponse = FileInfoResponse[CsvFileInfo, CsvReadOptions] @@ -219,12 +241,21 @@ async def get(self, path: str) -> None: file = self.data_file(path) file_format = ff.FileFormat.from_filename(file) + df = self.dataframe(path) + df.schema() + + try: + size_bytes = os.path.getsize(file) + except Exception: + size_bytes = None + match file_format: case ff.FileFormat.Csv: - info = CsvFileInfo() + info = CsvFileInfo(size_bytes=size_bytes) csv_response = CsvFileInfoResponse( info=info, default_options=CsvReadOptions(delimiter=info.delimiters[0]), + size_bytes=size_bytes, ) await self.finish(dataclasses.asdict(csv_response)) case ff.FileFormat.Sqlite: @@ -233,12 +264,13 @@ async def get(self, path: str) -> None: table_names = adbc.SqliteDataFrame.get_table_names(file) sqlite_response = SqliteFileInfoResponse( - info=SqliteFileInfo(table_names=table_names), + info=SqliteFileInfo(table_names=table_names, size_bytes=size_bytes), default_options=SqliteReadOptions(table_name=table_names[0]), + size_bytes=size_bytes, ) await self.finish(dataclasses.asdict(sqlite_response)) case _: - no_response = NoFileInfoResponse(info=Empty(), default_options=Empty()) + no_response = NoFileInfoResponse(info=Empty(), default_options=Empty(), size_bytes=size_bytes) await self.finish(dataclasses.asdict(no_response)) diff --git a/src/model.ts b/src/model.ts index 8f02ba2..3a57753 100644 --- a/src/model.ts +++ b/src/model.ts @@ -24,9 +24,15 @@ export namespace ArrowModel { export class ArrowModel extends DataModel { static async fromRemoteFileInfo(loadingOptions: ArrowModel.LoadingOptions) { - const { info: fileInfo, default_options: fileOptions } = await fetchFileInfo({ + const { + info: fileInfo, + default_options: fileOptions, + size_bytes, + } = await fetchFileInfo({ path: loadingOptions.path, }); + + console.log("size_bytes", size_bytes); return new ArrowModel(loadingOptions, fileOptions, fileInfo); } @@ -116,7 +122,9 @@ export class ArrowModel extends DataModel { // This is to showcase that we can put additional information in the column header but it // does not look good. HuggingFace dataset has some good inspiration. const field = this.schema.fields[column]; - return `${field.name} (${field.type}${field.nullable ? " | null" : ""})`; + return `${field.name} + (${field.type}${field.nullable ? " | null" : ""}) + Rows: ${this._numRows}`; } case "row-header": return row.toString(); diff --git a/src/requests.ts b/src/requests.ts index d8e1061..c3d76a0 100644 --- a/src/requests.ts +++ b/src/requests.ts @@ -22,6 +22,7 @@ export interface FileInfoResponseFor { export interface FileInfoResponse { info: FileInfo; default_options: FileReadOptions; + size_bytes?: number | null; } export async function fetchFileInfo(params: Readonly): Promise { @@ -90,7 +91,6 @@ export async function fetchStats( throw new Error(`Error communicating with the Arbalister server: ${response.status}`); } const data: StatsResponseRaw = await response.json(); - // Validate encoding and content type if (data.schema.encoding !== "base64") { throw new Error(`Unexpected schema encoding: ${data.schema.encoding}, expected "base64"`);