Skip to content

Commit 4032364

Browse files
Merge branch 'main' into new_execute_result
2 parents c316830 + d410046 commit 4032364

File tree

9 files changed

+754
-223
lines changed

9 files changed

+754
-223
lines changed

bigframes/blob/_functions.py

Lines changed: 188 additions & 97 deletions
Large diffs are not rendered by default.

bigframes/core/compile/sqlglot/expressions/comparison_ops.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,11 @@ def _(left: TypedExpr, right: TypedExpr) -> sge.Expression:
109109
return sge.LTE(this=left_expr, expression=right_expr)
110110

111111

112+
@register_binary_op(ops.minimum_op)
113+
def _(left: TypedExpr, right: TypedExpr) -> sge.Expression:
114+
return sge.Least(this=left.expr, expressions=right.expr)
115+
116+
112117
@register_binary_op(ops.ne_op)
113118
def _(left: TypedExpr, right: TypedExpr) -> sge.Expression:
114119
left_expr = _coerce_bool_to_int(left)

bigframes/core/compile/sqlglot/expressions/numeric_ops.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,14 @@ def _(left: TypedExpr, right: TypedExpr) -> sge.Expression:
377377
return result
378378

379379

380+
@register_binary_op(ops.round_op)
381+
def _(expr: TypedExpr, n_digits: TypedExpr) -> sge.Expression:
382+
rounded = sge.Round(this=expr.expr, decimals=n_digits.expr)
383+
if expr.dtype == dtypes.INT_DTYPE:
384+
return sge.Cast(this=rounded, to="INT64")
385+
return rounded
386+
387+
380388
@register_binary_op(ops.sub_op)
381389
def _(left: TypedExpr, right: TypedExpr) -> sge.Expression:
382390
if dtypes.is_numeric(left.dtype) and dtypes.is_numeric(right.dtype):

bigframes/operations/blob.py

Lines changed: 107 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,20 @@ def _df_apply_udf(
193193

194194
return s
195195

196+
def _apply_udf_or_raise_error(
197+
self, df: bigframes.dataframe.DataFrame, udf, operation_name: str
198+
) -> bigframes.series.Series:
199+
"""Helper to apply UDF with consistent error handling."""
200+
try:
201+
res = self._df_apply_udf(df, udf)
202+
except Exception as e:
203+
raise RuntimeError(f"{operation_name} UDF execution failed: {e}") from e
204+
205+
if res is None:
206+
raise RuntimeError(f"{operation_name} returned None result")
207+
208+
return res
209+
196210
def read_url(self) -> bigframes.series.Series:
197211
"""Retrieve the read URL of the Blob.
198212
@@ -343,6 +357,10 @@ def exif(
343357
344358
Returns:
345359
bigframes.series.Series: JSON series of key-value pairs if verbose=False, or struct with status and content if verbose=True.
360+
361+
Raises:
362+
ValueError: If engine is not 'pillow'.
363+
RuntimeError: If EXIF extraction fails or returns invalid structure.
346364
"""
347365
if engine is None or engine.casefold() != "pillow":
348366
raise ValueError("Must specify the engine, supported value is 'pillow'.")
@@ -364,22 +382,28 @@ def exif(
364382
container_memory=container_memory,
365383
).udf()
366384

367-
res = self._df_apply_udf(df, exif_udf)
385+
res = self._apply_udf_or_raise_error(df, exif_udf, "EXIF extraction")
368386

369387
if verbose:
370-
exif_content_series = bbq.parse_json(
371-
res._apply_unary_op(ops.JSONValue(json_path="$.content"))
372-
).rename("exif_content")
373-
exif_status_series = res._apply_unary_op(
374-
ops.JSONValue(json_path="$.status")
375-
)
388+
try:
389+
exif_content_series = bbq.parse_json(
390+
res._apply_unary_op(ops.JSONValue(json_path="$.content"))
391+
).rename("exif_content")
392+
exif_status_series = res._apply_unary_op(
393+
ops.JSONValue(json_path="$.status")
394+
)
395+
except Exception as e:
396+
raise RuntimeError(f"Failed to parse EXIF JSON result: {e}") from e
376397
results_df = bpd.DataFrame(
377398
{"status": exif_status_series, "content": exif_content_series}
378399
)
379400
results_struct = bbq.struct(results_df).rename("exif_results")
380401
return results_struct
381402
else:
382-
return bbq.parse_json(res)
403+
try:
404+
return bbq.parse_json(res)
405+
except Exception as e:
406+
raise RuntimeError(f"Failed to parse EXIF JSON result: {e}") from e
383407

384408
def image_blur(
385409
self,
@@ -411,6 +435,10 @@ def image_blur(
411435
412436
Returns:
413437
bigframes.series.Series: blob Series if destination is GCS. Or bytes Series if destination is BQ. If verbose=True, returns struct with status and content.
438+
439+
Raises:
440+
ValueError: If engine is not 'opencv' or parameters are invalid.
441+
RuntimeError: If image blur operation fails.
414442
"""
415443
if engine is None or engine.casefold() != "opencv":
416444
raise ValueError("Must specify the engine, supported value is 'opencv'.")
@@ -437,7 +465,7 @@ def image_blur(
437465
df["ksize_x"], df["ksize_y"] = ksize
438466
df["ext"] = ext # type: ignore
439467
df["verbose"] = verbose
440-
res = self._df_apply_udf(df, image_blur_udf)
468+
res = self._apply_udf_or_raise_error(df, image_blur_udf, "Image blur")
441469

442470
if verbose:
443471
blurred_content_b64_series = res._apply_unary_op(
@@ -486,7 +514,7 @@ def image_blur(
486514
df["ext"] = ext # type: ignore
487515
df["verbose"] = verbose
488516

489-
res = self._df_apply_udf(df, image_blur_udf)
517+
res = self._apply_udf_or_raise_error(df, image_blur_udf, "Image blur")
490518
res.cache() # to execute the udf
491519

492520
if verbose:
@@ -540,6 +568,10 @@ def image_resize(
540568
541569
Returns:
542570
bigframes.series.Series: blob Series if destination is GCS. Or bytes Series if destination is BQ. If verbose=True, returns struct with status and content.
571+
572+
Raises:
573+
ValueError: If engine is not 'opencv' or parameters are invalid.
574+
RuntimeError: If image resize operation fails.
543575
"""
544576
if engine is None or engine.casefold() != "opencv":
545577
raise ValueError("Must specify the engine, supported value is 'opencv'.")
@@ -570,11 +602,11 @@ def image_resize(
570602
container_memory=container_memory,
571603
).udf()
572604

573-
df["dsize_x"], df["dsizye_y"] = dsize
605+
df["dsize_x"], df["dsize_y"] = dsize
574606
df["fx"], df["fy"] = fx, fy
575607
df["ext"] = ext # type: ignore
576608
df["verbose"] = verbose
577-
res = self._df_apply_udf(df, image_resize_udf)
609+
res = self._apply_udf_or_raise_error(df, image_resize_udf, "Image resize")
578610

579611
if verbose:
580612
resized_content_b64_series = res._apply_unary_op(
@@ -620,12 +652,12 @@ def image_resize(
620652
dst_rt = dst.blob.get_runtime_json_str(mode="RW")
621653

622654
df = df.join(dst_rt, how="outer")
623-
df["dsize_x"], df["dsizye_y"] = dsize
655+
df["dsize_x"], df["dsize_y"] = dsize
624656
df["fx"], df["fy"] = fx, fy
625657
df["ext"] = ext # type: ignore
626658
df["verbose"] = verbose
627659

628-
res = self._df_apply_udf(df, image_resize_udf)
660+
res = self._apply_udf_or_raise_error(df, image_resize_udf, "Image resize")
629661
res.cache() # to execute the udf
630662

631663
if verbose:
@@ -679,6 +711,10 @@ def image_normalize(
679711
680712
Returns:
681713
bigframes.series.Series: blob Series if destination is GCS. Or bytes Series if destination is BQ. If verbose=True, returns struct with status and content.
714+
715+
Raises:
716+
ValueError: If engine is not 'opencv' or parameters are invalid.
717+
RuntimeError: If image normalize operation fails.
682718
"""
683719
if engine is None or engine.casefold() != "opencv":
684720
raise ValueError("Must specify the engine, supported value is 'opencv'.")
@@ -707,7 +743,9 @@ def image_normalize(
707743
df["norm_type"] = norm_type
708744
df["ext"] = ext # type: ignore
709745
df["verbose"] = verbose
710-
res = self._df_apply_udf(df, image_normalize_udf)
746+
res = self._apply_udf_or_raise_error(
747+
df, image_normalize_udf, "Image normalize"
748+
)
711749

712750
if verbose:
713751
normalized_content_b64_series = res._apply_unary_op(
@@ -758,7 +796,7 @@ def image_normalize(
758796
df["ext"] = ext # type: ignore
759797
df["verbose"] = verbose
760798

761-
res = self._df_apply_udf(df, image_normalize_udf)
799+
res = self._apply_udf_or_raise_error(df, image_normalize_udf, "Image normalize")
762800
res.cache() # to execute the udf
763801

764802
if verbose:
@@ -809,6 +847,10 @@ def pdf_extract(
809847
depend on the "verbose" parameter.
810848
Contains the extracted text from the PDF file.
811849
Includes error messages if verbosity is enabled.
850+
851+
Raises:
852+
ValueError: If engine is not 'pypdf'.
853+
RuntimeError: If PDF extraction fails or returns invalid structure.
812854
"""
813855
if engine is None or engine.casefold() != "pypdf":
814856
raise ValueError("Must specify the engine, supported value is 'pypdf'.")
@@ -830,18 +872,29 @@ def pdf_extract(
830872

831873
df = self.get_runtime_json_str(mode="R").to_frame()
832874
df["verbose"] = verbose
833-
res = self._df_apply_udf(df, pdf_extract_udf)
875+
876+
res = self._apply_udf_or_raise_error(df, pdf_extract_udf, "PDF extraction")
834877

835878
if verbose:
836-
extracted_content_series = res._apply_unary_op(
837-
ops.JSONValue(json_path="$.content")
838-
)
839-
status_series = res._apply_unary_op(ops.JSONValue(json_path="$.status"))
840-
results_df = bpd.DataFrame(
841-
{"status": status_series, "content": extracted_content_series}
842-
)
843-
results_struct = bbq.struct(results_df).rename("extracted_results")
844-
return results_struct
879+
# Extract content with error handling
880+
try:
881+
content_series = res._apply_unary_op(
882+
ops.JSONValue(json_path="$.content")
883+
)
884+
except Exception as e:
885+
raise RuntimeError(
886+
f"Failed to extract content field from PDF result: {e}"
887+
) from e
888+
try:
889+
status_series = res._apply_unary_op(ops.JSONValue(json_path="$.status"))
890+
except Exception as e:
891+
raise RuntimeError(
892+
f"Failed to extract status field from PDF result: {e}"
893+
) from e
894+
895+
res_df = bpd.DataFrame({"status": status_series, "content": content_series})
896+
struct_series = bbq.struct(res_df).rename("extracted_results")
897+
return struct_series
845898
else:
846899
return res.rename("extracted_content")
847900

@@ -884,6 +937,10 @@ def pdf_chunk(
884937
depend on the "verbose" parameter.
885938
where each string is a chunk of text extracted from PDF.
886939
Includes error messages if verbosity is enabled.
940+
941+
Raises:
942+
ValueError: If engine is not 'pypdf'.
943+
RuntimeError: If PDF chunking fails or returns invalid structure.
887944
"""
888945
if engine is None or engine.casefold() != "pypdf":
889946
raise ValueError("Must specify the engine, supported value is 'pypdf'.")
@@ -915,13 +972,25 @@ def pdf_chunk(
915972
df["overlap_size"] = overlap_size
916973
df["verbose"] = verbose
917974

918-
res = self._df_apply_udf(df, pdf_chunk_udf)
975+
res = self._apply_udf_or_raise_error(df, pdf_chunk_udf, "PDF chunking")
976+
977+
try:
978+
content_series = bbq.json_extract_string_array(res, "$.content")
979+
except Exception as e:
980+
raise RuntimeError(
981+
f"Failed to extract content array from PDF chunk result: {e}"
982+
) from e
919983

920984
if verbose:
921-
chunked_content_series = bbq.json_extract_string_array(res, "$.content")
922-
status_series = res._apply_unary_op(ops.JSONValue(json_path="$.status"))
985+
try:
986+
status_series = res._apply_unary_op(ops.JSONValue(json_path="$.status"))
987+
except Exception as e:
988+
raise RuntimeError(
989+
f"Failed to extract status field from PDF chunk result: {e}"
990+
) from e
991+
923992
results_df = bpd.DataFrame(
924-
{"status": status_series, "content": chunked_content_series}
993+
{"status": status_series, "content": content_series}
925994
)
926995
resultes_struct = bbq.struct(results_df).rename("chunked_results")
927996
return resultes_struct
@@ -962,6 +1031,10 @@ def audio_transcribe(
9621031
depend on the "verbose" parameter.
9631032
Contains the transcribed text from the audio file.
9641033
Includes error messages if verbosity is enabled.
1034+
1035+
Raises:
1036+
ValueError: If engine is not 'bigquery'.
1037+
RuntimeError: If the transcription result structure is invalid.
9651038
"""
9661039
if engine.casefold() != "bigquery":
9671040
raise ValueError("Must specify the engine, supported value is 'bigquery'.")
@@ -984,6 +1057,10 @@ def audio_transcribe(
9841057
model_params={"generationConfig": {"temperature": 0.0}},
9851058
)
9861059

1060+
# Validate that the result is not None
1061+
if transcribed_results is None:
1062+
raise RuntimeError("Transcription returned None result")
1063+
9871064
transcribed_content_series = transcribed_results.struct.field("result").rename(
9881065
"transcribed_content"
9891066
)

0 commit comments

Comments
 (0)