Skip to content

Commit 6d001c8

Browse files
committed
add error handling for one function, test is not clean
1 parent 7600001 commit 6d001c8

File tree

2 files changed

+90
-28
lines changed

2 files changed

+90
-28
lines changed

bigframes/blob/_functions.py

Lines changed: 33 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -395,40 +395,47 @@ def image_normalize_func(
395395

396396
def image_normalize_to_bytes_func(
397397
src_obj_ref_rt: str, alpha: float, beta: float, norm_type: str, ext: str
398-
) -> bytes:
399-
import json
398+
) -> str:
399+
try:
400+
import base64
401+
import json
400402

401-
import cv2 as cv # type: ignore
402-
import numpy as np
403-
import requests
404-
from requests import adapters
403+
import cv2 as cv # type: ignore
404+
import numpy as np
405+
import requests
406+
from requests import adapters
405407

406-
session = requests.Session()
407-
session.mount("https://", adapters.HTTPAdapter(max_retries=3))
408+
session = requests.Session()
409+
session.mount("https://", adapters.HTTPAdapter(max_retries=3))
408410

409-
ext = ext or ".jpeg"
411+
ext = ext or ".jpeg"
410412

411-
norm_type_mapping = {
412-
"inf": cv.NORM_INF,
413-
"l1": cv.NORM_L1,
414-
"l2": cv.NORM_L2,
415-
"minmax": cv.NORM_MINMAX,
416-
}
413+
norm_type_mapping = {
414+
"inf": cv.NORM_INF,
415+
"l1": cv.NORM_L1,
416+
"l2": cv.NORM_L2,
417+
"minmax": cv.NORM_MINMAX,
418+
}
417419

418-
src_obj_ref_rt_json = json.loads(src_obj_ref_rt)
419-
src_url = src_obj_ref_rt_json["access_urls"]["read_url"]
420+
src_obj_ref_rt_json = json.loads(src_obj_ref_rt)
421+
src_url = src_obj_ref_rt_json["access_urls"]["read_url"]
420422

421-
response = session.get(src_url, timeout=30)
422-
bts = response.content
423+
response = session.get(src_url, timeout=30)
424+
bts = response.content
423425

424-
nparr = np.frombuffer(bts, np.uint8)
425-
img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED)
426-
img_normalized = cv.normalize(
427-
img, None, alpha=alpha, beta=beta, norm_type=norm_type_mapping[norm_type]
428-
)
429-
bts = cv.imencode(".jpeg", img_normalized)[1].tobytes()
426+
nparr = np.frombuffer(bts, np.uint8)
427+
img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED)
428+
img_normalized = cv.normalize(
429+
img, None, alpha=alpha, beta=beta, norm_type=norm_type_mapping[norm_type]
430+
)
431+
bts = cv.imencode(".jpeg", img_normalized)[1].tobytes()
432+
result_dict = {"status": "", "content": base64.b64encode(bts).decode("utf-8")}
430433

431-
return bts
434+
except Exception as e:
435+
result_dict = {"status": str(e), "content": ""}
436+
437+
result_json = json.dumps(result_dict)
438+
return result_json
432439

433440

434441
image_normalize_to_bytes_def = FunctionDef(

bigframes/operations/blob.py

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -539,6 +539,7 @@ def image_normalize(
539539
max_batching_rows: int = 8192,
540540
container_cpu: Union[float, int] = 0.33,
541541
container_memory: str = "512Mi",
542+
verbose: bool = False,
542543
) -> bigframes.series.Series:
543544
"""Normalize images.
544545
@@ -556,14 +557,28 @@ def image_normalize(
556557
max_batching_rows (int, default 8,192): Max number of rows per batch send to cloud run to execute the function.
557558
container_cpu (int or float, default 0.33): number of container CPUs. Possible values are [0.33, 8]. Floats larger than 1 are cast to intergers.
558559
container_memory (str, default "512Mi"): container memory size. String of the format <number><unit>. Possible values are from 512Mi to 32Gi.
560+
verbose (bool, default "False"): controls the verbosity of the output.
561+
when set to True, both error messages and the normalized image
562+
content are displayed. Conversely, when set to False, only the
563+
normalized image content is presented, suppressing error messages.
559564
560565
Returns:
561566
bigframes.series.Series: blob Series if destination is GCS. Or bytes Series if destination is BQ.
562567
"""
563568
if engine is None or engine.casefold() != "opencv":
564569
raise ValueError("Must specify the engine, supported value is 'opencv'.")
565570

571+
bigframes.series.Series: blob Series if destination is GCS. Or
572+
struct[str, bytes] or bytes Series if destination is BQ,
573+
depend on the "verbose" parameter. Contains the normalized image
574+
data. Includes error messages if verbosity is enbled.
575+
576+
"""
577+
import base64
578+
579+
import bigframes.bigquery as bbq
566580
import bigframes.blob._functions as blob_func
581+
import bigframes.pandas as bpd
567582
568583
connection = self._resolve_connection(connection)
569584
df = self.get_runtime_json_str(mode="R").to_frame()
@@ -586,7 +601,27 @@ def image_normalize(
586601
df["ext"] = ext # type: ignore
587602
res = self._df_apply_udf(df, image_normalize_udf)
588603
589-
return res
604+
bq_session = self._block.bq_session
605+
encoded_content_series = res._apply_unary_op(
606+
ops.JSONValue(json_path="$.content")
607+
)
608+
base64_decode_udf = bq_session.register_function(
609+
"base64_decode_bq",
610+
lambda x: bbq.query(f"SELECT TO_BASE64(FROM_BASE64('{x}'))")
611+
.to_dataframe()
612+
.iloc[0, 0],
613+
)
614+
decoded_content_series = encoded_content_series.apply(base64_decode_udf)
615+
616+
if verbose:
617+
status_series = res._apply_unary_op(ops.JSONValue(json_path="$.status"))
618+
res_df = bpd.DataFrame(
619+
{"status": status_series, "content": decoded_content_series}
620+
)
621+
struct_series = bbq.struct(res_df)
622+
return struct_series
623+
else:
624+
return decoded_content_series
590625
591626
if isinstance(dst, str):
592627
dst = os.path.join(dst, "")
@@ -618,7 +653,27 @@ def image_normalize(
618653
res = self._df_apply_udf(df, image_normalize_udf)
619654
res.cache() # to execute the udf
620655
621-
return dst
656+
bq_session = self._block.bq_session
657+
encoded_content_series = res._apply_unary_op(
658+
ops.JSONValue(json_path="$.content")
659+
)
660+
base64_decode_udf = bq_session.register_function(
661+
"base64_decode_bq",
662+
lambda x: bbq.query(f"SELECT TO_BASE64(FROM_BASE64('{x}'))")
663+
.to_dataframe()
664+
.iloc[0, 0],
665+
)
666+
decoded_content_series = encoded_content_series.apply(base64_decode_udf)
667+
668+
if verbose:
669+
status_series = res._apply_unary_op(ops.JSONValue(json_path="$.status"))
670+
res_df = bpd.DataFrame(
671+
{"status": status_series, "content": decoded_content_series}
672+
)
673+
struct_series = bbq.struct(res_df)
674+
return struct_series
675+
else:
676+
return decoded_content_series
622677
623678
def pdf_extract(
624679
self,

0 commit comments

Comments
 (0)