Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion edg/abstract_parts/IoController.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def _make_pinning(self) -> Dict[str, CircuitPort]:
allocation_list.append((io_port.elt_type(), self.get(io_port.requested())))
elif isinstance(io_port, Port): # derive Port connections from is_connected
if self.get(io_port.is_connected()):
requested = [self._name_of_child(io_port)] # generate requested name from port name if connected
requested = [self._name_of_child(io_port, self)] # generate requested name from port name if connected
else:
requested = []
allocation_list.append((type(io_port), requested))
Expand Down
6 changes: 3 additions & 3 deletions edg/core/Array.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,12 @@ def items(self) -> ItemsView[str, VectorType]:
return self._elts.items()

# unlike most other LibraryElement types, the names are stored in _elts and _allocates
def _name_of_child(self, subelt: Any, allow_unknown: bool = False) -> str:
def _name_of_child(self, subelt: Any, context: Any, allow_unknown: bool = False) -> str:
from .HierarchyBlock import Block
block_parent = self._block_parent()
assert isinstance(block_parent, Block)

if builder.get_enclosing_block() is block_parent or builder.get_enclosing_block() is None:
if context is block_parent:
# in block defining this port (direct elt definition), or in test top
assert self._elts is not None, "can't get name on undefined vector"
for (name, elt) in self._elts.items():
Expand All @@ -145,7 +145,7 @@ def _name_of_child(self, subelt: Any, allow_unknown: bool = False) -> str:
return f"(unknown {subelt.__class__.__name__})"
else:
raise ValueError(f"no name for {subelt}")
elif builder.get_enclosing_block() is block_parent._parent:
elif context is block_parent._parent:
# in block enclosing the block defining this port (allocate required)
for (i, (suggested_name, allocate_elt)) in enumerate(self._requests):
if subelt is allocate_elt:
Expand Down
4 changes: 2 additions & 2 deletions edg/core/Binding.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ def expr_to_proto(self, expr: ConstraintExpr, ref_map: IdentityDict[Refable, edg

class InitParamBinding(ParamBinding):
"""Binding that indicates this is a parameter from an __init__ argument.
Can optionally take a value, which would have a binding in the parent's scope."""
def __init__(self, parent: ParamParentTypes, value: Optional[ConstraintExpr] = None):
Can optionally take a value, which is the raw value passed into __init__."""
def __init__(self, parent: ParamParentTypes, value: Optional[Any] = None):
super().__init__(parent)
self.value = value

Expand Down
14 changes: 10 additions & 4 deletions edg/core/Blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,10 +292,16 @@ def _def_to_proto(self) -> BaseBlockEdgirType:
raise NotImplementedError

def _elaborated_def_to_proto(self) -> BaseBlockEdgirType:
assert self._elaboration_state == BlockElaborationState.post_init
self._elaboration_state = BlockElaborationState.contents
self.contents()
self._elaboration_state = BlockElaborationState.post_contents
prev_element = builder.push_element(self)
assert prev_element is None
try:
assert self._elaboration_state == BlockElaborationState.post_init
self._elaboration_state = BlockElaborationState.contents
self.contents()
self._elaboration_state = BlockElaborationState.post_contents
finally:
builder.pop_to(prev_element)

return self._def_to_proto()

def _populate_def_proto_block_base(self, pb: BaseBlockEdgirType) -> BaseBlockEdgirType:
Expand Down
15 changes: 8 additions & 7 deletions edg/core/Builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,14 @@ def push_element(self, elt: BaseBlock) -> Optional[BaseBlock]:
self.stack.append(elt)
return prev_elt

def pop_to(self, elt: Optional[BaseBlock]) -> None:
while (elt is None and self.stack) or (elt is not None and self.stack[-1] is not elt):
self.stack.pop()
def pop_to(self, prev: Optional[BaseBlock]) -> None:
"""Pops at most one element from stack, expecting prev to be at the top of the stack.
The pattern should be one pop for one push, and allowing that duplicate pushes are ignored."""
if (prev is None and not self.stack) or (prev is not None and self.stack[-1] is prev):
return

self.stack.pop()
Copy link

Copilot AI Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pop_to method only pops a single element from the stack, but should continue popping until reaching the target element. The original implementation used a while loop to pop multiple elements. If there are nested contexts, this will fail to restore the stack to the correct state.

Consider restoring the while loop:

while (elt is None and self.stack) or (elt is not None and self.stack[-1] is not elt):
  self.stack.pop()
Suggested change
self.stack.pop()
while (elt is None and self.stack) or (elt is not None and self.stack[-1] is not elt):
self.stack.pop()

Copilot uses AI. Check for mistakes.
assert (prev is None and not self.stack) or (prev is not None and self.stack[-1] is prev)

def get_enclosing_block(self) -> Optional[BaseBlock]:
if not self.stack:
Expand All @@ -39,8 +44,6 @@ def get_curr_context(self) -> Optional[BaseBlock]:
def elaborate_toplevel(self, block: BaseBlock, *,
is_generator: bool = False,
generate_values: Iterable[Tuple[edgir.LocalPath, edgir.ValueLit]] = []) -> edgir.HierarchyBlock:
assert self.get_enclosing_block() is None
self.push_element(block)
try:
if is_generator: # TODO this is kind of nasty =(
elaborated = block._generated_def_to_proto(generate_values) # type: ignore
Expand All @@ -50,8 +53,6 @@ def elaborate_toplevel(self, block: BaseBlock, *,
return elaborated
except Exception as e:
raise Exception(f"While elaborating {block.__class__}") from e
finally:
self.pop_to(None)


builder = Builder()
4 changes: 2 additions & 2 deletions edg/core/Core.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ def __setattr__(self, name: str, value: Any) -> None:
self.manager.add_element(name, value)
super().__setattr__(name, value)

def _name_of_child(self, subelt: Any, allow_unknown: bool = False) -> str:
def _name_of_child(self, subelt: Any, context: Any, allow_unknown: bool = False) -> str:
self_name = self.manager.name_of(subelt)
if self_name is not None:
return self_name
Expand All @@ -224,7 +224,7 @@ def _path_from(self, base: LibraryElement, allow_unknown: bool = False) -> List[
return []
else:
assert self._parent is not None, "can't get path / name for non-bound element"
return self._parent._path_from(base, allow_unknown) + [self._parent._name_of_child(self, allow_unknown)]
return self._parent._path_from(base, allow_unknown) + [self._parent._name_of_child(self, base, allow_unknown)]

def _name_from(self, base: LibraryElement, allow_unknown: bool = False) -> str:
"""Returns the path name to (inclusive) this element from some starting point.
Expand Down
32 changes: 19 additions & 13 deletions edg/core/DesignTop.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import TypeVar, Union, List, Tuple, Dict, Type

from .. import edgir
from .Builder import builder
from .Ports import Port
from .ConstraintExpr import ConstraintExpr
from .HdlUserExceptions import BlockDefinitionError
Expand Down Expand Up @@ -51,11 +52,16 @@ def multipack(self):

# TODO make this non-overriding? - this needs to call multipack after contents
def _elaborated_def_to_proto(self) -> edgir.HierarchyBlock:
assert self._elaboration_state == BlockElaborationState.post_init
self._elaboration_state = BlockElaborationState.contents
self.contents()
self.multipack()
self._elaboration_state = BlockElaborationState.post_contents
prev_element = builder.push_element(self)
assert prev_element is None
try:
assert self._elaboration_state == BlockElaborationState.post_init
self._elaboration_state = BlockElaborationState.contents
self.contents()
self.multipack()
self._elaboration_state = BlockElaborationState.post_contents
finally:
builder.pop_to(prev_element)
return self._def_to_proto()

def _populate_def_proto_block_contents(self, pb: edgir.HierarchyBlock) -> edgir.HierarchyBlock:
Expand All @@ -78,7 +84,7 @@ def _populate_def_proto_block_contents(self, pb: edgir.HierarchyBlock) -> edgir.
else:
raise TypeError
assert isinstance(multipack_block, MultipackBlock)
multipack_name = self._name_of_child(multipack_block)
multipack_name = self._name_of_child(multipack_block, self)
multipack_ref_base = edgir.LocalPath()
multipack_ref_base.steps.add().name = multipack_name
multipack_ref_map = multipack_block._get_ref_map(multipack_ref_base)
Expand All @@ -90,9 +96,9 @@ def _populate_def_proto_block_contents(self, pb: edgir.HierarchyBlock) -> edgir.
packed_ref_map = multipack_part_block._get_ref_map(packed_ref_base)

if isinstance(multipack_part, Block):
part_name = multipack_block._name_of_child(multipack_part)
part_name = multipack_block._name_of_child(multipack_part, self)
elif isinstance(multipack_part, PackedBlockAllocate):
part_name = multipack_block._name_of_child(multipack_part.parent)
part_name = multipack_block._name_of_child(multipack_part.parent, self)
assert multipack_part.suggested_name, "multipack parts must have suggested name, for consistency"
part_name += f"[{multipack_part.suggested_name}]"
else:
Expand All @@ -105,7 +111,7 @@ def _populate_def_proto_block_contents(self, pb: edgir.HierarchyBlock) -> edgir.
packed_port_port = packed_port.port
else:
raise TypeError
packed_port_name = multipack_part_block._name_of_child(packed_port_port)
packed_port_name = multipack_part_block._name_of_child(packed_port_port, self)
exported_tunnel = edgir.add_pair(pb.constraints,
f"(packed){multipack_name}.{part_name}.{packed_port_name}").exportedTunnel
exported_tunnel.internal_block_port.ref.CopyFrom(multipack_ref_map[exterior_port])
Expand All @@ -117,13 +123,13 @@ def _populate_def_proto_block_contents(self, pb: edgir.HierarchyBlock) -> edgir.

for multipack_param, packed_param in packing_rule.tunnel_assigns.items():
if isinstance(packed_param, ConstraintExpr):
packed_param_name = multipack_part_block._name_of_child(packed_param)
packed_param_name = multipack_part_block._name_of_child(packed_param, self)
assign_tunnel = edgir.add_pair(pb.constraints,
f"(packed){multipack_name}.{part_name}.{packed_param_name}").assignTunnel
assign_tunnel.dst.CopyFrom(multipack_ref_map[multipack_param])
assign_tunnel.src.ref.CopyFrom(packed_ref_map[packed_param])
elif isinstance(packed_param, PackedBlockParamArray):
multipack_param_name = multipack_block._name_of_child(multipack_param)
multipack_param_name = multipack_block._name_of_child(multipack_param, self)
constr_name = f"(packed){multipack_name}.{multipack_param_name}"
packed_params.setdefault(constr_name, (multipack_ref_map[multipack_param], []))[1].append(
packed_ref_map[packed_param.param])
Expand All @@ -132,14 +138,14 @@ def _populate_def_proto_block_contents(self, pb: edgir.HierarchyBlock) -> edgir.

for multipack_param, unpacked_param in packing_rule.tunnel_unpack_assigns.items():
if isinstance(unpacked_param, ConstraintExpr):
multipack_param_name = multipack_block._name_of_child(multipack_param)
multipack_param_name = multipack_block._name_of_child(multipack_param, self)
# TODO need better constraint naming scheme
assign_tunnel = edgir.add_pair(pb.constraints,
f"(unpacked){multipack_name}.{part_name}.{multipack_param_name}").assignTunnel
assign_tunnel.dst.CopyFrom(packed_ref_map[unpacked_param])
assign_tunnel.src.ref.CopyFrom(multipack_ref_map[multipack_param])
elif isinstance(unpacked_param, PackedBlockParam):
multipack_param_name = multipack_block._name_of_child(multipack_param)
multipack_param_name = multipack_block._name_of_child(multipack_param, self)
# TODO need better constraint naming scheme
assign_tunnel = edgir.add_pair(pb.constraints,
f"(unpacked){multipack_name}.{part_name}.{multipack_param_name}").assignTunnel
Expand Down
51 changes: 28 additions & 23 deletions edg/core/Generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from deprecated import deprecated

from .. import edgir
from .Builder import builder
from .Ports import BasePort, Port
from .PortTag import PortTag
from .IdentityDict import IdentityDict
Expand Down Expand Up @@ -106,30 +107,34 @@ def _def_to_proto(self) -> edgir.HierarchyBlock:

def _generated_def_to_proto(self, generate_values: Iterable[Tuple[edgir.LocalPath, edgir.ValueLit]]) -> \
edgir.HierarchyBlock:
assert self._elaboration_state == BlockElaborationState.post_init # TODO dedup w/ elaborated_def_to_proto
self._elaboration_state = BlockElaborationState.contents

self.contents()

self._elaboration_state = BlockElaborationState.generate

# Translate parameter values to function arguments
ref_map = self._get_ref_map(edgir.LocalPath())
generate_values_map = {path.SerializeToString(): value for (path, value) in generate_values}

assert (self.__class__, AbstractBlockProperty) not in self._elt_properties # abstract blocks can't generate
if type(self).generate is not GeneratorBlock.generate:
for param in self._generator_params_list:
self._generator_param_values[param] = param._from_lit(generate_values_map[ref_map[param].SerializeToString()])
self.generate()
elif self._generator is not None: # legacy generator style
fn_args = [arg_param._from_lit(generate_values_map[ref_map[arg_param].SerializeToString()])
for arg_param in self._generator.fn_args]
self._generator.fn(*fn_args)
else:
raise BlockDefinitionError(self, "Generator missing generate implementation", "define generate")
prev_element = builder.push_element(self)
assert prev_element is None

try:
assert self._elaboration_state == BlockElaborationState.post_init # TODO dedup w/ elaborated_def_to_proto
self._elaboration_state = BlockElaborationState.contents
self.contents()
self._elaboration_state = BlockElaborationState.generate

# Translate parameter values to function arguments
ref_map = self._get_ref_map(edgir.LocalPath())
generate_values_map = {path.SerializeToString(): value for (path, value) in generate_values}

assert (self.__class__, AbstractBlockProperty) not in self._elt_properties # abstract blocks can't generate
if type(self).generate is not GeneratorBlock.generate:
for param in self._generator_params_list:
self._generator_param_values[param] = param._from_lit(generate_values_map[ref_map[param].SerializeToString()])
self.generate()
elif self._generator is not None: # legacy generator style
fn_args = [arg_param._from_lit(generate_values_map[ref_map[arg_param].SerializeToString()])
for arg_param in self._generator.fn_args]
self._generator.fn(*fn_args)
else:
raise BlockDefinitionError(self, "Generator missing generate implementation", "define generate")

self._elaboration_state = BlockElaborationState.post_generate
self._elaboration_state = BlockElaborationState.post_generate
finally:
builder.pop_to(prev_element)

return self._def_to_proto()

Expand Down
10 changes: 6 additions & 4 deletions edg/core/HierarchyBlock.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def remap_arg(arg_name: str, arg_type: Type[ConstraintExpr], arg_value: Any) ->

if isinstance(arg_value, ConstraintExpr): # otherwise, create a new arg
if arg_value._is_bound():
typed_arg_value: Optional[ConstraintExpr] = arg_type._to_expr_type(arg_value)
typed_arg_value: Optional[ConstraintExpr] = arg_value
elif arg_value.initializer is None:
typed_arg_value = None
else:
Expand All @@ -175,7 +175,7 @@ def remap_arg(arg_name: str, arg_type: Type[ConstraintExpr], arg_value: Any) ->
"either leave default empty or pass in a value or uninitialized type " + \
"(eg, 2.0, FloatExpr(), but NOT FloatExpr(2.0))")
elif arg_value is not None:
typed_arg_value = arg_type._to_expr_type(arg_value)
typed_arg_value = arg_value
else:
typed_arg_value = None

Expand Down Expand Up @@ -275,7 +275,8 @@ def _populate_def_proto_block_base(self, pb: edgir.HierarchyBlock) -> edgir.Hier
for param_name, param in self._parameters.items():
if isinstance(param.binding, InitParamBinding) and param.binding.value is not None:
# default values can't depend on anything so the ref_map is empty
pb.param_defaults[param_name].CopyFrom(param.binding.value._expr_to_proto(IdentityDict()))
param_typed_value = param._to_expr_type(param.binding.value)
pb.param_defaults[param_name].CopyFrom(param_typed_value._expr_to_proto(IdentityDict()))

return pb

Expand Down Expand Up @@ -381,8 +382,9 @@ def _populate_def_proto_hierarchy(self, pb: edgir.HierarchyBlock) -> edgir.Hiera
all_block_params.update(mixin._parameters.items())
for (block_param_name, block_param) in all_block_params.items():
if isinstance(block_param.binding, InitParamBinding) and block_param.binding.value is not None:
param_typed_value = block_param._to_expr_type(block_param.binding.value)
edgir.add_pair(pb.constraints, f'(init){block_name}.{block_param_name}').CopyFrom( # TODO better name
AssignBinding.make_assign(block_param, block_param.binding.value, ref_map)
AssignBinding.make_assign(block_param, param_typed_value, ref_map)
)

return pb
Expand Down
2 changes: 1 addition & 1 deletion edg/core/Ports.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def _convert(self, adapter: PortAdapter[ConvertTargetType]) -> ConvertTargetType
adapter_inst = enclosing_block.Block(adapter)
adapter_name_suffix = f"_{self._adapter_count}" if self._adapter_count > 0 else ""
enclosing_block.manager.add_element(
f"(adapter){block_parent._name_from(enclosing_block)}.{self._name_from(block_parent)}{adapter_name_suffix}",
f"(adapter){self._name_from(enclosing_block)}{adapter_name_suffix}",
adapter_inst)
enclosing_block.connect(self, adapter_inst.src) # we don't name it to avoid explicit name conflicts
self._adapter_count += 1
Expand Down