Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ authors = [{name = "Substrait contributors", email = "substrait@googlegroups.com
license = {text = "Apache-2.0"}
readme = "README.md"
requires-python = ">=3.10"
dependencies = ["protobuf >=3.19.1,<6"]
dependencies = ["protobuf >=3.19.1,<6", "typing_extensions"]
dynamic = ["version"]

[tool.setuptools_scm]
Expand Down
77 changes: 37 additions & 40 deletions src/substrait/builders/extended_expression.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
from datetime import date
import itertools
from datetime import date
from typing import Any, Callable, Iterable, Union

import substrait.gen.proto.algebra_pb2 as stalg
import substrait.gen.proto.type_pb2 as stp
import substrait.gen.proto.extended_expression_pb2 as stee
import substrait.gen.proto.extensions.extensions_pb2 as ste
import substrait.gen.proto.type_pb2 as stp
from substrait.extension_registry import ExtensionRegistry
from substrait.type_inference import infer_extended_expression_schema
from substrait.utils import (
type_num_names,
merge_extension_urns,
merge_extension_uris,
merge_extension_declarations,
merge_extension_uris,
merge_extension_urns,
type_num_names,
)
from substrait.type_inference import infer_extended_expression_schema
from typing import Callable, Any, Union, Iterable

UnboundExtendedExpression = Callable[
[stp.NamedStruct, ExtensionRegistry], stee.ExtendedExpression
Expand All @@ -21,7 +22,7 @@


def _alias_or_inferred(
alias: Union[Iterable[str], str],
alias: Union[Iterable[str], str, None],
op: str,
args: Iterable[str],
):
Expand All @@ -44,7 +45,7 @@ def resolve_expression(


def literal(
value: Any, type: stp.Type, alias: Union[Iterable[str], str] = None
value: Any, type: stp.Type, alias: Union[Iterable[str], str, None] = None
) -> UnboundExtendedExpression:
"""Builds a resolver for ExtendedExpression containing a literal expression"""

Expand Down Expand Up @@ -154,7 +155,7 @@ def resolve(
return resolve


def column(field: Union[str, int], alias: Union[Iterable[str], str] = None):
def column(field: Union[str, int], alias: Union[Iterable[str], str, None] = None):
"""Builds a resolver for ExtendedExpression containing a FieldReference expression

Accepts either an index or a field name of a desired field.
Expand Down Expand Up @@ -208,7 +209,7 @@ def scalar_function(
urn: str,
function: str,
expressions: Iterable[ExtendedExpressionOrUnbound],
alias: Union[Iterable[str], str] = None,
alias: Union[Iterable[str], str, None] = None,
):
"""Builds a resolver for ExtendedExpression containing a ScalarFunction expression"""

Expand Down Expand Up @@ -306,7 +307,7 @@ def aggregate_function(
urn: str,
function: str,
expressions: Iterable[ExtendedExpressionOrUnbound],
alias: Union[Iterable[str], str] = None,
alias: Union[Iterable[str], str, None] = None,
):
"""Builds a resolver for ExtendedExpression containing a AggregateFunction measure"""

Expand Down Expand Up @@ -402,7 +403,7 @@ def window_function(
function: str,
expressions: Iterable[ExtendedExpressionOrUnbound],
partitions: Iterable[ExtendedExpressionOrUnbound] = [],
alias: Union[Iterable[str], str] = None,
alias: Union[Iterable[str], str, None] = None,
):
"""Builds a resolver for ExtendedExpression containing a WindowFunction expression"""

Expand Down Expand Up @@ -512,7 +513,7 @@ def resolve(
def if_then(
ifs: Iterable[tuple[ExtendedExpressionOrUnbound, ExtendedExpressionOrUnbound]],
_else: ExtendedExpressionOrUnbound,
alias: Union[Iterable[str], str] = None,
alias: Union[Iterable[str], str, None] = None,
):
"""Builds a resolver for ExtendedExpression containing an IfThen expression"""

Expand Down Expand Up @@ -551,24 +552,16 @@ def resolve(
referred_expr=[
stee.ExpressionReference(
expression=stalg.Expression(
if_then=stalg.Expression.IfThen(
**{
"ifs": [
stalg.Expression.IfThen.IfClause(
**{
"if": if_clause[0]
.referred_expr[0]
.expression,
"then": if_clause[1]
.referred_expr[0]
.expression,
}
)
for if_clause in bound_ifs
],
"else": bound_else.referred_expr[0].expression,
}
)
if_then=stalg.Expression.IfThen(**{
"ifs": [
stalg.Expression.IfThen.IfClause(**{
"if": if_clause[0].referred_expr[0].expression,
"then": if_clause[1].referred_expr[0].expression,
})
for if_clause in bound_ifs
],
"else": bound_else.referred_expr[0].expression,
})
),
output_names=_alias_or_inferred(
alias,
Expand Down Expand Up @@ -639,12 +632,10 @@ def resolve(
switch_expression=stalg.Expression.SwitchExpression(
match=bound_match.referred_expr[0].expression,
ifs=[
stalg.Expression.SwitchExpression.IfValue(
**{
"if": i.referred_expr[0].expression.literal,
"then": t.referred_expr[0].expression,
}
)
stalg.Expression.SwitchExpression.IfValue(**{
"if": i.referred_expr[0].expression.literal,
"then": t.referred_expr[0].expression,
})
for i, t in bound_ifs
],
**{"else": bound_else.referred_expr[0].expression},
Expand Down Expand Up @@ -767,7 +758,11 @@ def resolve(
return resolve


def cast(input: ExtendedExpressionOrUnbound, type: stp.Type):
def cast(
input: ExtendedExpressionOrUnbound,
type: stp.Type,
alias: Union[Iterable[str], str, None] = None,
):
"""Builds a resolver for ExtendedExpression containing a cast expression"""

def resolve(
Expand All @@ -785,7 +780,9 @@ def resolve(
failure_behavior=stalg.Expression.Cast.FAILURE_BEHAVIOR_RETURN_NULL,
)
),
output_names=["cast"], # TODO construct name from inputs
output_names=_alias_or_inferred(
alias, "cast", [bound_input.referred_expr[0].output_names[0]]
),
)
],
base_schema=base_schema,
Expand Down
50 changes: 44 additions & 6 deletions src/substrait/builders/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,39 @@
See `examples/builder_example.py` for usage.
"""

from typing import Iterable, Optional, Union, Callable

from typing import Callable, Iterable, Optional, TypedDict, Union

import substrait.gen.proto.algebra_pb2 as stalg
from substrait.gen.proto.extensions.extensions_pb2 import AdvancedExtension
import substrait.gen.proto.extended_expression_pb2 as stee
import substrait.gen.proto.plan_pb2 as stp
import substrait.gen.proto.type_pb2 as stt
import substrait.gen.proto.extended_expression_pb2 as stee
from substrait.extension_registry import ExtensionRegistry
from substrait.builders.extended_expression import (
ExtendedExpressionOrUnbound,
resolve_expression,
)
from substrait.extension_registry import ExtensionRegistry
from substrait.gen.proto.extensions.extensions_pb2 import AdvancedExtension
from substrait.type_inference import infer_plan_schema
from substrait.utils import (
merge_extension_declarations,
merge_extension_urns,
merge_extension_uris,
merge_extension_urns,
)

UnboundPlan = Callable[[ExtensionRegistry], stp.Plan]

PlanOrUnbound = Union[stp.Plan, UnboundPlan]

_ExtensionDict = TypedDict(
"_ExtensionDict",
{"extension_uris": list, "extension_urns": list, "extensions": list},
)


def _merge_extensions(*objs):
def _merge_extensions(
*objs,
) -> _ExtensionDict:
"""Merge extension URIs, URNs, and declarations from multiple plan/expression objects.

During the URI -> URN migration period, we maintain both URI and URN references
Expand Down Expand Up @@ -379,3 +387,33 @@ def resolve(registry: ExtensionRegistry) -> stp.Plan:
)

return resolve


def write_table(
table_names: Union[str, Iterable[str]],
input: PlanOrUnbound,
create_mode: Union[stalg.WriteRel.CreateMode.ValueType, None] = None,
) -> UnboundPlan:
def resolve(registry: ExtensionRegistry) -> stp.Plan:
bound_input = input if isinstance(input, stp.Plan) else input(registry)
ns = infer_plan_schema(bound_input)
_table_names = [table_names] if isinstance(table_names, str) else table_names
_create_mode = create_mode or stalg.WriteRel.CREATE_MODE_ERROR_IF_EXISTS

write_rel = stalg.Rel(
write=stalg.WriteRel(
input=bound_input.relations[-1].root.input,
table_schema=ns,
op=stalg.WriteRel.WRITE_OP_CTAS,
create_mode=_create_mode,
named_table=stalg.NamedObjectWrite(names=_table_names),
)
)
return stp.Plan(
relations=[
stp.PlanRel(root=stalg.RelRoot(input=write_rel, names=ns.names))
],
**_merge_extensions(bound_input),
)

return resolve
8 changes: 8 additions & 0 deletions src/substrait/builders/type.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,14 @@ def precision_timestamp_tz(precision: int, nullable=True) -> stt.Type:
)
)

def timestamp(nullable=True) -> stt.Type:
return stt.Type(
timestamp=stt.Type.Timestamp(
nullability=stt.Type.NULLABILITY_NULLABLE
if nullable
else stt.Type.NULLABILITY_REQUIRED,
)
)

def struct(types: Iterable[stt.Type], nullable=True) -> stt.Type:
return stt.Type(
Expand Down
Loading