|
45 | 45 | polars_installed = True |
46 | 46 | if TYPE_CHECKING: |
47 | 47 | import polars as pl |
| 48 | + import pyarrow as pa |
48 | 49 | else: |
49 | 50 | try: |
50 | 51 | import bigframes._importing |
51 | 52 |
|
52 | | - # Use import_polars() instead of importing directly so that we check |
53 | | - # the version numbers. |
54 | 53 | pl = bigframes._importing.import_polars() |
| 54 | + import pyarrow as pa |
55 | 55 | except Exception: |
56 | 56 | polars_installed = False |
57 | 57 |
|
@@ -409,11 +409,13 @@ def _(self, op: ops.ScalarOp, input: pl.Expr) -> pl.Expr: |
409 | 409 |
|
410 | 410 | @compile_op.register(json_ops.ToJSONString) |
411 | 411 | def _(self, op: ops.ScalarOp, input: pl.Expr) -> pl.Expr: |
412 | | - return input.str.json_decode(pl.String()) |
| 412 | + # Convert JSON to string representation |
| 413 | + return input.cast(pl.String()) |
413 | 414 |
|
414 | 415 | @compile_op.register(json_ops.ParseJSON) |
415 | 416 | def _(self, op: ops.ScalarOp, input: pl.Expr) -> pl.Expr: |
416 | | - return input.str.json_decode(pl.String()) |
| 417 | + # Parse string as JSON - this should decode, not encode |
| 418 | + return input.str.json_decode() |
417 | 419 |
|
418 | 420 | @compile_op.register(json_ops.JSONExtract) |
419 | 421 | def _(self, op: ops.ScalarOp, input: pl.Expr) -> pl.Expr: |
@@ -599,9 +601,35 @@ def compile_readlocal(self, node: nodes.ReadLocalNode): |
599 | 601 | scan_item.source_id: scan_item.id.sql |
600 | 602 | for scan_item in node.scan_list.items |
601 | 603 | } |
602 | | - lazy_frame = cast( |
603 | | - pl.DataFrame, pl.from_arrow(node.local_data_source.data) |
604 | | - ).lazy() |
| 604 | + |
| 605 | + # Workaround for PyArrow bug https://github.com/apache/arrow/issues/45262 |
| 606 | + # Convert JSON columns to strings before Polars processing |
| 607 | + arrow_data = node.local_data_source.data |
| 608 | + schema = arrow_data.schema |
| 609 | + |
| 610 | + # Check if any columns are JSON type |
| 611 | + json_field_indices = [ |
| 612 | + i |
| 613 | + for i, field in enumerate(schema) |
| 614 | + if pa.types.is_extension_type(field.type) |
| 615 | + and field.type.extension_name == "google:sqlType:json" |
| 616 | + ] |
| 617 | + |
| 618 | + if json_field_indices: |
| 619 | + # Convert JSON columns to string columns |
| 620 | + new_arrays = [] |
| 621 | + new_fields = [] |
| 622 | + for i, field in enumerate(schema): |
| 623 | + if i in json_field_indices: |
| 624 | + # Cast JSON to string |
| 625 | + new_arrays.append(arrow_data.column(i).cast(pa.string())) |
| 626 | + new_fields.append(pa.field(field.name, pa.string())) |
| 627 | + else: |
| 628 | + new_arrays.append(arrow_data.column(i)) |
| 629 | + new_fields.append(field) |
| 630 | + arrow_data = pa.table(new_arrays, schema=pa.schema(new_fields)) |
| 631 | + |
| 632 | + lazy_frame = cast(pl.DataFrame, pl.from_arrow(arrow_data)).lazy() |
605 | 633 | lazy_frame = lazy_frame.select(cols_to_read.keys()).rename(cols_to_read) |
606 | 634 | if node.offsets_col: |
607 | 635 | lazy_frame = lazy_frame.with_columns( |
|
0 commit comments