Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions arbalister/arrow.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import codecs
import pathlib
from pathlib import Path
from typing import Any, Callable

import datafusion as dn
Expand Down Expand Up @@ -153,3 +154,37 @@ def write_csv(

out = adbc.write_sqlite
return out

def get_parquet_column_stats(path:str | Path)->list[dict[str, Any]]:
"""Get parquet column stats."""
import pyarrow.parquet as pq

if isinstance(path, Path):
path = str(path)

file = pq.ParquetFile(path)
metadata = file.metadata
schema = file.schema
columns_stats = []
for i, field in enumerate(schema):
min_val = None
max_val = None
null_count = 0

for row_group_index in range(metadata.num_row_groups):
row = metadata.row_group(row_group_index)
col_chunk = row.column(i)
stats = col_chunk.statistics
if stats:
if min_val is None or stats.min < min_val:
min_val = stats.min
if max_val is None or stats.max < max_val:
max_val = stats.max
null_count +=stats.null_count if stats.null_count is not None else 0
columns_stats.append({
"name":field.name,
"min":min_val,
"max": max_val,
"null_count": null_count
})
return columns_stats
38 changes: 35 additions & 3 deletions arbalister/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,24 @@ class SchemaInfo:
encoding: str = "base64"


@dataclasses.dataclass(frozen=True, slots=True)
class ColumnStats:
"""Column stats for a parquet file."""

name: str
min: object | None = None
max: object | None = None
null_count: int | None = None


@dataclasses.dataclass(frozen=True, slots=True)
class StatsResponse:
"""File statistics returned in the stats route."""

schema: SchemaInfo
num_rows: int = 0
num_cols: int = 0
columns_stats: list[ColumnStats] | None = None


class StatsRouteHandler(BaseRouteHandler):
Expand All @@ -144,6 +155,13 @@ class StatsRouteHandler(BaseRouteHandler):
@tornado.web.authenticated
async def get(self, path: str) -> None:
"""HTTP GET return statistics."""
file = self.data_file(path)
file_format = ff.FileFormat.from_filename(file)

columns_stats = None
if file_format == ff.FileFormat.Parquet:
parquet_columns_stats = abw.get_parquet_column_stats(file)
columns_stats = [ColumnStats(**stats) for stats in parquet_columns_stats]
df = self.dataframe(path)

# FIXME this is not optimal for ORC/CSV where we can read_metadata, but it is not read
Expand Down Expand Up @@ -175,6 +193,7 @@ async def get(self, path: str) -> None:
num_cols=len(schema),
num_rows=num_rows,
schema=SchemaInfo(data=schema_64),
columns_stats=columns_stats,
)
await self.finish(dataclasses.asdict(response))

Expand All @@ -184,13 +203,15 @@ class SqliteFileInfo:
"""Sqlite specific information about a file."""

table_names: list[str]
size_bytes: int | None = None


@dataclasses.dataclass(frozen=True, slots=True)
class CsvFileInfo:
"""Csv specific information about a file."""

delimiters: list[str] = dataclasses.field(default_factory=lambda: [",", ";", "\\t", "|", "#"])
size_bytes: int | None = None


FileInfo = SqliteFileInfo | CsvFileInfo
Expand All @@ -202,6 +223,7 @@ class FileInfoResponse[I, P]:

info: I
default_options: P
size_bytes: int | None = None


CsvFileInfoResponse = FileInfoResponse[CsvFileInfo, CsvReadOptions]
Expand All @@ -219,12 +241,21 @@ async def get(self, path: str) -> None:
file = self.data_file(path)
file_format = ff.FileFormat.from_filename(file)

df = self.dataframe(path)
df.schema()

try:
size_bytes = os.path.getsize(file)
except Exception:
size_bytes = None

match file_format:
case ff.FileFormat.Csv:
info = CsvFileInfo()
info = CsvFileInfo(size_bytes=size_bytes)
csv_response = CsvFileInfoResponse(
info=info,
default_options=CsvReadOptions(delimiter=info.delimiters[0]),
size_bytes=size_bytes,
)
await self.finish(dataclasses.asdict(csv_response))
case ff.FileFormat.Sqlite:
Expand All @@ -233,12 +264,13 @@ async def get(self, path: str) -> None:
table_names = adbc.SqliteDataFrame.get_table_names(file)

sqlite_response = SqliteFileInfoResponse(
info=SqliteFileInfo(table_names=table_names),
info=SqliteFileInfo(table_names=table_names, size_bytes=size_bytes),
default_options=SqliteReadOptions(table_name=table_names[0]),
size_bytes=size_bytes,
)
await self.finish(dataclasses.asdict(sqlite_response))
case _:
no_response = NoFileInfoResponse(info=Empty(), default_options=Empty())
no_response = NoFileInfoResponse(info=Empty(), default_options=Empty(), size_bytes=size_bytes)
await self.finish(dataclasses.asdict(no_response))


Expand Down
12 changes: 10 additions & 2 deletions src/model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,15 @@ export namespace ArrowModel {

export class ArrowModel extends DataModel {
static async fromRemoteFileInfo(loadingOptions: ArrowModel.LoadingOptions) {
const { info: fileInfo, default_options: fileOptions } = await fetchFileInfo({
const {
info: fileInfo,
default_options: fileOptions,
size_bytes,
} = await fetchFileInfo({
path: loadingOptions.path,
});

console.log("size_bytes", size_bytes);
return new ArrowModel(loadingOptions, fileOptions, fileInfo);
}

Expand Down Expand Up @@ -116,7 +122,9 @@ export class ArrowModel extends DataModel {
// This is to showcase that we can put additional information in the column header but it
// does not look good. HuggingFace dataset has some good inspiration.
const field = this.schema.fields[column];
return `${field.name} (${field.type}${field.nullable ? " | null" : ""})`;
return `${field.name}
(${field.type}${field.nullable ? " | null" : ""})
Rows: ${this._numRows}`;
}
case "row-header":
return row.toString();
Expand Down
2 changes: 1 addition & 1 deletion src/requests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ export interface FileInfoResponseFor<T extends FileType> {
export interface FileInfoResponse {
info: FileInfo;
default_options: FileReadOptions;
size_bytes?: number | null;
}

export async function fetchFileInfo(params: Readonly<FileInfoOptions>): Promise<FileInfoResponse> {
Expand Down Expand Up @@ -90,7 +91,6 @@ export async function fetchStats(
throw new Error(`Error communicating with the Arbalister server: ${response.status}`);
}
const data: StatsResponseRaw = await response.json();

// Validate encoding and content type
if (data.schema.encoding !== "base64") {
throw new Error(`Unexpected schema encoding: ${data.schema.encoding}, expected "base64"`);
Expand Down
Loading