Skip to content

Commit e23f95e

Browse files
committed
add error handling for one function, test is not clean
1 parent 090ce8e commit e23f95e

File tree

2 files changed

+90
-28
lines changed

2 files changed

+90
-28
lines changed

bigframes/blob/_functions.py

Lines changed: 33 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -394,40 +394,47 @@ def image_normalize_func(
394394

395395
def image_normalize_to_bytes_func(
396396
src_obj_ref_rt: str, alpha: float, beta: float, norm_type: str, ext: str
397-
) -> bytes:
398-
import json
397+
) -> str:
398+
try:
399+
import base64
400+
import json
399401

400-
import cv2 as cv # type: ignore
401-
import numpy as np
402-
import requests
403-
from requests import adapters
402+
import cv2 as cv # type: ignore
403+
import numpy as np
404+
import requests
405+
from requests import adapters
404406

405-
session = requests.Session()
406-
session.mount("https://", adapters.HTTPAdapter(max_retries=3))
407+
session = requests.Session()
408+
session.mount("https://", adapters.HTTPAdapter(max_retries=3))
407409

408-
ext = ext or ".jpeg"
410+
ext = ext or ".jpeg"
409411

410-
norm_type_mapping = {
411-
"inf": cv.NORM_INF,
412-
"l1": cv.NORM_L1,
413-
"l2": cv.NORM_L2,
414-
"minmax": cv.NORM_MINMAX,
415-
}
412+
norm_type_mapping = {
413+
"inf": cv.NORM_INF,
414+
"l1": cv.NORM_L1,
415+
"l2": cv.NORM_L2,
416+
"minmax": cv.NORM_MINMAX,
417+
}
416418

417-
src_obj_ref_rt_json = json.loads(src_obj_ref_rt)
418-
src_url = src_obj_ref_rt_json["access_urls"]["read_url"]
419+
src_obj_ref_rt_json = json.loads(src_obj_ref_rt)
420+
src_url = src_obj_ref_rt_json["access_urls"]["read_url"]
419421

420-
response = session.get(src_url, timeout=30)
421-
bts = response.content
422+
response = session.get(src_url, timeout=30)
423+
bts = response.content
422424

423-
nparr = np.frombuffer(bts, np.uint8)
424-
img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED)
425-
img_normalized = cv.normalize(
426-
img, None, alpha=alpha, beta=beta, norm_type=norm_type_mapping[norm_type]
427-
)
428-
bts = cv.imencode(".jpeg", img_normalized)[1].tobytes()
425+
nparr = np.frombuffer(bts, np.uint8)
426+
img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED)
427+
img_normalized = cv.normalize(
428+
img, None, alpha=alpha, beta=beta, norm_type=norm_type_mapping[norm_type]
429+
)
430+
bts = cv.imencode(".jpeg", img_normalized)[1].tobytes()
431+
result_dict = {"status": "", "content": base64.b64encode(bts).decode("utf-8")}
429432

430-
return bts
433+
except Exception as e:
434+
result_dict = {"status": str(e), "content": ""}
435+
436+
result_json = json.dumps(result_dict)
437+
return result_json
431438

432439

433440
image_normalize_to_bytes_def = FunctionDef(

bigframes/operations/blob.py

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -539,6 +539,7 @@ def image_normalize(
539539
max_batching_rows: int = 8192,
540540
container_cpu: Union[float, int] = 0.33,
541541
container_memory: str = "512Mi",
542+
verbose: bool = False,
542543
) -> bigframes.series.Series:
543544
"""Normalize images.
544545
@@ -556,14 +557,28 @@ def image_normalize(
556557
max_batching_rows (int, default 8,192): Max number of rows per batch send to cloud run to execute the function.
557558
container_cpu (int or float, default 0.33): number of container CPUs. Possible values are [0.33, 8]. Floats larger than 1 are cast to intergers.
558559
container_memory (str, default "512Mi"): container memory size. String of the format <number><unit>. Possible values are from 512Mi to 32Gi.
560+
verbose (bool, default "False"): controls the verbosity of the output.
561+
when set to True, both error messages and the normalized image
562+
content are displayed. Conversely, when set to False, only the
563+
normalized image content is presented, suppressing error messages.
559564
560565
Returns:
561566
bigframes.series.Series: blob Series if destination is GCS. Or bytes Series if destination is BQ.
562567
"""
563568
if engine is None or engine.casefold() != "opencv":
564569
raise ValueError("Must specify the engine, supported value is 'opencv'.")
565570

571+
bigframes.series.Series: blob Series if destination is GCS. Or
572+
struct[str, bytes] or bytes Series if destination is BQ,
573+
depend on the "verbose" parameter. Contains the normalized image
574+
data. Includes error messages if verbosity is enbled.
575+
576+
"""
577+
import base64
578+
579+
import bigframes.bigquery as bbq
566580
import bigframes.blob._functions as blob_func
581+
import bigframes.pandas as bpd
567582
568583
connection = self._resolve_connection(connection)
569584
df = self.get_runtime_json_str(mode="R").to_frame()
@@ -586,7 +601,27 @@ def image_normalize(
586601
df["ext"] = ext # type: ignore
587602
res = self._df_apply_udf(df, image_normalize_udf)
588603
589-
return res
604+
bq_session = self._block.bq_session
605+
encoded_content_series = res._apply_unary_op(
606+
ops.JSONValue(json_path="$.content")
607+
)
608+
base64_decode_udf = bq_session.register_function(
609+
"base64_decode_bq",
610+
lambda x: bbq.query(f"SELECT TO_BASE64(FROM_BASE64('{x}'))")
611+
.to_dataframe()
612+
.iloc[0, 0],
613+
)
614+
decoded_content_series = encoded_content_series.apply(base64_decode_udf)
615+
616+
if verbose:
617+
status_series = res._apply_unary_op(ops.JSONValue(json_path="$.status"))
618+
res_df = bpd.DataFrame(
619+
{"status": status_series, "content": decoded_content_series}
620+
)
621+
struct_series = bbq.struct(res_df)
622+
return struct_series
623+
else:
624+
return decoded_content_series
590625
591626
if isinstance(dst, str):
592627
dst = os.path.join(dst, "")
@@ -618,7 +653,27 @@ def image_normalize(
618653
res = self._df_apply_udf(df, image_normalize_udf)
619654
res.cache() # to execute the udf
620655
621-
return dst
656+
bq_session = self._block.bq_session
657+
encoded_content_series = res._apply_unary_op(
658+
ops.JSONValue(json_path="$.content")
659+
)
660+
base64_decode_udf = bq_session.register_function(
661+
"base64_decode_bq",
662+
lambda x: bbq.query(f"SELECT TO_BASE64(FROM_BASE64('{x}'))")
663+
.to_dataframe()
664+
.iloc[0, 0],
665+
)
666+
decoded_content_series = encoded_content_series.apply(base64_decode_udf)
667+
668+
if verbose:
669+
status_series = res._apply_unary_op(ops.JSONValue(json_path="$.status"))
670+
res_df = bpd.DataFrame(
671+
{"status": status_series, "content": decoded_content_series}
672+
)
673+
struct_series = bbq.struct(res_df)
674+
return struct_series
675+
else:
676+
return decoded_content_series
622677
623678
def pdf_extract(
624679
self,

0 commit comments

Comments
 (0)