Skip to content

Commit b4ef718

Browse files
committed
Add image normalize error handling
1 parent 70e9bc2 commit b4ef718

File tree

3 files changed

+77
-67
lines changed

3 files changed

+77
-67
lines changed

bigframes/blob/_functions.py

Lines changed: 14 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
float: "FLOAT64",
2727
str: "STRING",
2828
bytes: "BYTES",
29-
bool: "BOOL",
3029
}
3130

3231

@@ -340,11 +339,12 @@ def image_normalize_func(
340339
beta: float,
341340
norm_type: str,
342341
ext: str,
343-
verbose: bool,
344342
) -> str:
345-
try:
346-
import json
343+
import json
344+
345+
result_dict = {"status": "", "content": dst_obj_ref_rt}
347346

347+
try:
348348
import cv2 as cv # type: ignore
349349
import numpy as np
350350
import requests
@@ -392,18 +392,11 @@ def image_normalize_func(
392392
},
393393
timeout=30,
394394
)
395-
if verbose:
396-
result_dict = {"status": "", "content": dst_obj_ref_rt}
397-
return json.dumps(result_dict)
398-
else:
399-
return dst_obj_ref_rt
400395

401396
except Exception as e:
402-
if verbose:
403-
result_dict = {"status": str(e), "content": None}
404-
return json.dumps(result_dict)
405-
else:
406-
return None
397+
result_dict["status"] = str(e)
398+
399+
return json.dumps(result_dict)
407400

408401

409402
image_normalize_def = FunctionDef(
@@ -417,7 +410,6 @@ def image_normalize_to_bytes_func(
417410
beta: float,
418411
norm_type: str,
419412
ext: str,
420-
verbose: bool,
421413
) -> str:
422414
try:
423415
import base64
@@ -453,21 +445,15 @@ def image_normalize_to_bytes_func(
453445
)
454446
bts = cv.imencode(".jpeg", img_normalized)[1].tobytes()
455447

456-
if verbose:
457-
content_b64 = base64.b64encode(bts).decode("utf-8")
458-
result_dict = {"status": "", "content": content_b64}
459-
result_json = json.dumps(result_dict)
460-
return result_json
461-
else:
462-
return bts
448+
content_b64 = base64.b64encode(bts).decode("utf-8")
449+
result_dict = {"status": "", "content": content_b64}
450+
result_json = json.dumps(result_dict)
463451

464452
except Exception as e:
465-
if verbose:
466-
result_dict = {"status": str(e), "content": b""}
467-
result_json = json.dumps(result_dict)
468-
return result_json
469-
else:
470-
return b""
453+
result_dict = {"status": str(e), "content": b""}
454+
result_json = json.dumps(result_dict)
455+
456+
return result_json
471457

472458

473459
image_normalize_to_bytes_def = FunctionDef(

bigframes/operations/blob.py

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -637,8 +637,6 @@ def image_normalize(
637637
if engine is None or engine.casefold() != "opencv":
638638
raise ValueError("Must specify the engine, supported value is 'opencv'.")
639639

640-
import base64
641-
642640
import bigframes.bigquery as bbq
643641
import bigframes.blob._functions as blob_func
644642
import bigframes.pandas as bpd
@@ -662,25 +660,25 @@ def image_normalize(
662660
df["beta"] = beta
663661
df["norm_type"] = norm_type
664662
df["ext"] = ext # type: ignore
665-
df["verbose"] = verbose
666663
res = self._df_apply_udf(df, image_normalize_udf)
667664

665+
normalized_content_b64_series = res._apply_unary_op(
666+
ops.JSONValue(json_path="$.content")
667+
)
668+
normalized_bytes = bbq.sql_scalar(
669+
"FROM_BASE64({0})", columns=[normalized_content_b64_series]
670+
)
668671
if verbose:
669672
normalized_status_series = res._apply_unary_op(
670673
ops.JSONValue(json_path="$.status")
671674
)
672-
normalized_content_b64_series = res._apply_unary_op(
673-
ops.JSONValue(json_path="$.content")
674-
)
675-
# TODO this is not allowed, I need to find another way
676-
normalized_bytes = base64.b64decode(normalized_content_b64_series)
677675
results_df = bpd.DataFrame(
678676
{"status": normalized_status_series, "content": normalized_bytes}
679677
)
680678
results_struct = bbq.struct(results_df).rename("normalized_results")
681679
return results_struct
682680
else:
683-
return res
681+
return normalized_bytes.rename("normalized_bytes")
684682

685683
if isinstance(dst, str):
686684
dst = os.path.join(dst, "")
@@ -708,22 +706,31 @@ def image_normalize(
708706
df["beta"] = beta
709707
df["norm_type"] = norm_type
710708
df["ext"] = ext # type: ignore
711-
# df["verbose"] = verbose
712709

713710
res = self._df_apply_udf(df, image_normalize_udf)
714711
res.cache() # to execute the udf
715712

713+
normalized_content_series = res._apply_unary_op(
714+
ops.JSONValue(json_path="$.content")
715+
)
716+
normalized_content_blobs = normalized_content_series.str.to_blob(
717+
connection=connection
718+
)
719+
716720
if verbose:
717721
normalized_status_series = res._apply_unary_op(
718722
ops.JSONValue(json_path="$.status")
719723
)
720724
results_df = bpd.DataFrame(
721-
{"status": normalized_status_series, "content": dst}
725+
{
726+
"status": normalized_status_series,
727+
"content": normalized_content_blobs,
728+
}
722729
)
723730
results_struct = bbq.struct(results_df).rename("normalized_results")
724731
return results_struct
725732
else:
726-
return dst
733+
return normalized_content_blobs.rename("normalized_content")
727734

728735
def pdf_extract(
729736
self,
@@ -781,7 +788,7 @@ def pdf_extract(
781788

782789
extracted_content_series = res._apply_unary_op(
783790
ops.JSONValue(json_path="$.content")
784-
).rename("extracted_content")
791+
)
785792

786793
if verbose:
787794
status_series = res._apply_unary_op(ops.JSONValue(json_path="$.status"))
@@ -791,7 +798,7 @@ def pdf_extract(
791798
results_struct = bbq.struct(results_df).rename("extracted_results")
792799
return results_struct
793800
else:
794-
return extracted_content_series
801+
return extracted_content_series.rename("extracted_content")
795802

796803
def pdf_chunk(
797804
self,
@@ -865,9 +872,8 @@ def pdf_chunk(
865872

866873
res = self._df_apply_udf(df, pdf_chunk_udf)
867874

868-
chunked_content_series = bbq.json_extract_string_array(res, "$.content").rename(
869-
"chunked_content"
870-
)
875+
chunked_content_series = bbq.json_extract_string_array(res, "$.content")
876+
871877
if verbose:
872878
status_series = res._apply_unary_op(ops.JSONValue(json_path="$.status"))
873879
results_df = bpd.DataFrame(
@@ -876,7 +882,7 @@ def pdf_chunk(
876882
resultes_struct = bbq.struct(results_df).rename("chunked_results")
877883
return resultes_struct
878884
else:
879-
return chunked_content_series
885+
return chunked_content_series.rename("chunked_content")
880886

881887
def audio_transcribe(
882888
self,
@@ -934,6 +940,7 @@ def audio_transcribe(
934940
model_params={"generationConfig": {"temperature": 0.0}},
935941
)
936942

943+
937944
transcribed_content_series = transcribed_results.struct.field("result").rename(
938945
"transcribed_content"
939946
)
@@ -949,4 +956,4 @@ def audio_transcribe(
949956
results_struct = bbq.struct(results_df).rename("transcription_results")
950957
return results_struct
951958
else:
952-
return transcribed_content_series
959+
return transcribed_content_series.rename("transcribed_content")

tests/system/large/blob/test_function.py

Lines changed: 37 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -228,11 +228,13 @@ def test_blob_image_resize_to_bq(images_mm_df: bpd.DataFrame, bq_connection: str
228228
assert actual.dtype == dtypes.BYTES_DTYPE
229229

230230

231+
@pytest.mark.parametrize("verbose", [True, False])
231232
def test_blob_image_normalize_to_series(
232233
images_mm_df: bpd.DataFrame,
233234
bq_connection: str,
234235
images_output_uris: list[str],
235236
session: bigframes.Session,
237+
verbose: bool,
236238
):
237239
series = bpd.Series(images_output_uris, session=session).str.to_blob(
238240
connection=bq_connection
@@ -246,30 +248,48 @@ def test_blob_image_normalize_to_series(
246248
connection=bq_connection,
247249
engine="opencv",
248250
)
249-
expected_df = pd.DataFrame(
250-
{
251-
"uri": images_output_uris,
252-
"version": [None, None],
253-
"authorizer": [bq_connection.casefold(), bq_connection.casefold()],
254-
"details": [None, None],
255-
}
256-
)
257-
pd.testing.assert_frame_equal(
258-
actual.struct.explode().to_pandas(),
259-
expected_df,
260-
check_dtype=False,
261-
check_index_type=False,
262-
)
263251

264-
# verify the files exist
265-
assert not actual.blob.size().isna().any()
252+
if verbose:
253+
254+
assert hasattr(actual, "struct")
255+
actual_exploded = actual.struct.explode()
256+
assert "status" in actual_exploded.columns
257+
assert "content" in actual_exploded.columns
258+
259+
status_series = actual_exploded["status"]
260+
assert status_series.dtype == dtypes.STRING_DTYPE
261+
262+
content_series = actual_exploded["content"]
263+
# Content should be blob objects for GCS destination
264+
assert hasattr(content_series, "blob")
265+
266+
else:
267+
expected_df = pd.DataFrame(
268+
{
269+
"uri": images_output_uris,
270+
"version": [None, None],
271+
"authorizer": [bq_connection.casefold(), bq_connection.casefold()],
272+
"details": [None, None],
273+
}
274+
)
275+
pd.testing.assert_frame_equal(
276+
actual.struct.explode().to_pandas(),
277+
expected_df,
278+
check_dtype=False,
279+
check_index_type=False,
280+
)
281+
282+
# verify the files exist
283+
assert not actual.blob.size().isna().any()
266284

267285

286+
@pytest.mark.parametrize("verbose", [True, False])
268287
def test_blob_image_normalize_to_folder(
269288
images_mm_df: bpd.DataFrame,
270289
bq_connection: str,
271290
images_output_folder: str,
272291
images_output_uris: list[str],
292+
verbose: bool,
273293
):
274294
actual = images_mm_df["blob_col"].blob.image_normalize(
275295
alpha=50.0,
@@ -298,10 +318,7 @@ def test_blob_image_normalize_to_folder(
298318
assert not actual.blob.size().isna().any()
299319

300320

301-
@pytest.mark.parametrize(
302-
"verbose",
303-
[True, False],
304-
)
321+
@pytest.mark.parametrize("verbose", [True, False])
305322
def test_blob_image_normalize_to_bq(
306323
images_mm_df: bpd.DataFrame, bq_connection: str, verbose: bool
307324
):

0 commit comments

Comments
 (0)