Skip to content

Commit 1f14191

Browse files
committed
Add image normalize error handling
1 parent aecd695 commit 1f14191

File tree

3 files changed

+77
-68
lines changed

3 files changed

+77
-68
lines changed

bigframes/blob/_functions.py

Lines changed: 14 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
float: "FLOAT64",
2727
str: "STRING",
2828
bytes: "BYTES",
29-
bool: "BOOL",
3029
}
3130

3231

@@ -339,11 +338,12 @@ def image_normalize_func(
339338
beta: float,
340339
norm_type: str,
341340
ext: str,
342-
verbose: bool,
343341
) -> str:
344-
try:
345-
import json
342+
import json
343+
344+
result_dict = {"status": "", "content": dst_obj_ref_rt}
346345

346+
try:
347347
import cv2 as cv # type: ignore
348348
import numpy as np
349349
import requests
@@ -391,18 +391,11 @@ def image_normalize_func(
391391
},
392392
timeout=30,
393393
)
394-
if verbose:
395-
result_dict = {"status": "", "content": dst_obj_ref_rt}
396-
return json.dumps(result_dict)
397-
else:
398-
return dst_obj_ref_rt
399394

400395
except Exception as e:
401-
if verbose:
402-
result_dict = {"status": str(e), "content": None}
403-
return json.dumps(result_dict)
404-
else:
405-
return None
396+
result_dict["status"] = str(e)
397+
398+
return json.dumps(result_dict)
406399

407400

408401
image_normalize_def = FunctionDef(
@@ -416,7 +409,6 @@ def image_normalize_to_bytes_func(
416409
beta: float,
417410
norm_type: str,
418411
ext: str,
419-
verbose: bool,
420412
) -> str:
421413
try:
422414
import base64
@@ -452,21 +444,15 @@ def image_normalize_to_bytes_func(
452444
)
453445
bts = cv.imencode(".jpeg", img_normalized)[1].tobytes()
454446

455-
if verbose:
456-
content_b64 = base64.b64encode(bts).decode("utf-8")
457-
result_dict = {"status": "", "content": content_b64}
458-
result_json = json.dumps(result_dict)
459-
return result_json
460-
else:
461-
return bts
447+
content_b64 = base64.b64encode(bts).decode("utf-8")
448+
result_dict = {"status": "", "content": content_b64}
449+
result_json = json.dumps(result_dict)
462450

463451
except Exception as e:
464-
if verbose:
465-
result_dict = {"status": str(e), "content": b""}
466-
result_json = json.dumps(result_dict)
467-
return result_json
468-
else:
469-
return b""
452+
result_dict = {"status": str(e), "content": b""}
453+
result_json = json.dumps(result_dict)
454+
455+
return result_json
470456

471457

472458
image_normalize_to_bytes_def = FunctionDef(

bigframes/operations/blob.py

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -637,8 +637,6 @@ def image_normalize(
637637
if engine is None or engine.casefold() != "opencv":
638638
raise ValueError("Must specify the engine, supported value is 'opencv'.")
639639

640-
import base64
641-
642640
import bigframes.bigquery as bbq
643641
import bigframes.blob._functions as blob_func
644642
import bigframes.pandas as bpd
@@ -662,25 +660,25 @@ def image_normalize(
662660
df["beta"] = beta
663661
df["norm_type"] = norm_type
664662
df["ext"] = ext # type: ignore
665-
df["verbose"] = verbose
666663
res = self._df_apply_udf(df, image_normalize_udf)
667664

665+
normalized_content_b64_series = res._apply_unary_op(
666+
ops.JSONValue(json_path="$.content")
667+
)
668+
normalized_bytes = bbq.sql_scalar(
669+
"FROM_BASE64({0})", columns=[normalized_content_b64_series]
670+
)
668671
if verbose:
669672
normalized_status_series = res._apply_unary_op(
670673
ops.JSONValue(json_path="$.status")
671674
)
672-
normalized_content_b64_series = res._apply_unary_op(
673-
ops.JSONValue(json_path="$.content")
674-
)
675-
# TODO this is not allowed, I need to find another way
676-
normalized_bytes = base64.b64decode(normalized_content_b64_series)
677675
results_df = bpd.DataFrame(
678676
{"status": normalized_status_series, "content": normalized_bytes}
679677
)
680678
results_struct = bbq.struct(results_df).rename("normalized_results")
681679
return results_struct
682680
else:
683-
return res
681+
return normalized_bytes.rename("normalized_bytes")
684682

685683
if isinstance(dst, str):
686684
dst = os.path.join(dst, "")
@@ -708,22 +706,31 @@ def image_normalize(
708706
df["beta"] = beta
709707
df["norm_type"] = norm_type
710708
df["ext"] = ext # type: ignore
711-
# df["verbose"] = verbose
712709

713710
res = self._df_apply_udf(df, image_normalize_udf)
714711
res.cache() # to execute the udf
715712

713+
normalized_content_series = res._apply_unary_op(
714+
ops.JSONValue(json_path="$.content")
715+
)
716+
normalized_content_blobs = normalized_content_series.str.to_blob(
717+
connection=connection
718+
)
719+
716720
if verbose:
717721
normalized_status_series = res._apply_unary_op(
718722
ops.JSONValue(json_path="$.status")
719723
)
720724
results_df = bpd.DataFrame(
721-
{"status": normalized_status_series, "content": dst}
725+
{
726+
"status": normalized_status_series,
727+
"content": normalized_content_blobs,
728+
}
722729
)
723730
results_struct = bbq.struct(results_df).rename("normalized_results")
724731
return results_struct
725732
else:
726-
return dst
733+
return normalized_content_blobs.rename("normalized_content")
727734

728735
def pdf_extract(
729736
self,
@@ -781,7 +788,7 @@ def pdf_extract(
781788

782789
extracted_content_series = res._apply_unary_op(
783790
ops.JSONValue(json_path="$.content")
784-
).rename("extracted_content")
791+
)
785792

786793
if verbose:
787794
status_series = res._apply_unary_op(ops.JSONValue(json_path="$.status"))
@@ -791,7 +798,7 @@ def pdf_extract(
791798
results_struct = bbq.struct(results_df).rename("extracted_results")
792799
return results_struct
793800
else:
794-
return extracted_content_series
801+
return extracted_content_series.rename("extracted_content")
795802

796803
def pdf_chunk(
797804
self,
@@ -865,9 +872,8 @@ def pdf_chunk(
865872

866873
res = self._df_apply_udf(df, pdf_chunk_udf)
867874

868-
chunked_content_series = bbq.json_extract_string_array(res, "$.content").rename(
869-
"chunked_content"
870-
)
875+
chunked_content_series = bbq.json_extract_string_array(res, "$.content")
876+
871877
if verbose:
872878
status_series = res._apply_unary_op(ops.JSONValue(json_path="$.status"))
873879
results_df = bpd.DataFrame(
@@ -876,7 +882,7 @@ def pdf_chunk(
876882
resultes_struct = bbq.struct(results_df).rename("chunked_results")
877883
return resultes_struct
878884
else:
879-
return chunked_content_series
885+
return chunked_content_series.rename("chunked_content")
880886

881887
def audio_transcribe(
882888
self,
@@ -940,7 +946,7 @@ def audio_transcribe(
940946

941947
transcribed_content_series = cast(
942948
bpd.Series, transcribed_results["ml_generate_text_llm_result"]
943-
).rename("transcribed_content")
949+
)
944950

945951
if verbose:
946952
transcribed_status_series = cast(
@@ -955,4 +961,4 @@ def audio_transcribe(
955961
results_struct = bbq.struct(results_df).rename("transcription_results")
956962
return results_struct
957963
else:
958-
return transcribed_content_series
964+
return transcribed_content_series.rename("transcribed_content")

tests/system/large/blob/test_function.py

Lines changed: 37 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -227,11 +227,13 @@ def test_blob_image_resize_to_bq(images_mm_df: bpd.DataFrame, bq_connection: str
227227
assert actual.dtype == dtypes.BYTES_DTYPE
228228

229229

230+
@pytest.mark.parametrize("verbose", [True, False])
230231
def test_blob_image_normalize_to_series(
231232
images_mm_df: bpd.DataFrame,
232233
bq_connection: str,
233234
images_output_uris: list[str],
234235
session: bigframes.Session,
236+
verbose: bool,
235237
):
236238
series = bpd.Series(images_output_uris, session=session).str.to_blob(
237239
connection=bq_connection
@@ -245,30 +247,48 @@ def test_blob_image_normalize_to_series(
245247
connection=bq_connection,
246248
engine="opencv",
247249
)
248-
expected_df = pd.DataFrame(
249-
{
250-
"uri": images_output_uris,
251-
"version": [None, None],
252-
"authorizer": [bq_connection.casefold(), bq_connection.casefold()],
253-
"details": [None, None],
254-
}
255-
)
256-
pd.testing.assert_frame_equal(
257-
actual.struct.explode().to_pandas(),
258-
expected_df,
259-
check_dtype=False,
260-
check_index_type=False,
261-
)
262250

263-
# verify the files exist
264-
assert not actual.blob.size().isna().any()
251+
if verbose:
252+
253+
assert hasattr(actual, "struct")
254+
actual_exploded = actual.struct.explode()
255+
assert "status" in actual_exploded.columns
256+
assert "content" in actual_exploded.columns
257+
258+
status_series = actual_exploded["status"]
259+
assert status_series.dtype == dtypes.STRING_DTYPE
260+
261+
content_series = actual_exploded["content"]
262+
# Content should be blob objects for GCS destination
263+
assert hasattr(content_series, "blob")
264+
265+
else:
266+
expected_df = pd.DataFrame(
267+
{
268+
"uri": images_output_uris,
269+
"version": [None, None],
270+
"authorizer": [bq_connection.casefold(), bq_connection.casefold()],
271+
"details": [None, None],
272+
}
273+
)
274+
pd.testing.assert_frame_equal(
275+
actual.struct.explode().to_pandas(),
276+
expected_df,
277+
check_dtype=False,
278+
check_index_type=False,
279+
)
280+
281+
# verify the files exist
282+
assert not actual.blob.size().isna().any()
265283

266284

285+
@pytest.mark.parametrize("verbose", [True, False])
267286
def test_blob_image_normalize_to_folder(
268287
images_mm_df: bpd.DataFrame,
269288
bq_connection: str,
270289
images_output_folder: str,
271290
images_output_uris: list[str],
291+
verbose: bool,
272292
):
273293
actual = images_mm_df["blob_col"].blob.image_normalize(
274294
alpha=50.0,
@@ -297,10 +317,7 @@ def test_blob_image_normalize_to_folder(
297317
assert not actual.blob.size().isna().any()
298318

299319

300-
@pytest.mark.parametrize(
301-
"verbose",
302-
[True, False],
303-
)
320+
@pytest.mark.parametrize("verbose", [True, False])
304321
def test_blob_image_normalize_to_bq(
305322
images_mm_df: bpd.DataFrame, bq_connection: str, verbose: bool
306323
):

0 commit comments

Comments
 (0)