"""
Tree-walking interpreter for DSL execution.
Evaluates AST nodes to produce geometry and other values.
"""
import os
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Callable, Union
# Default recursion limit for command-to-command calls
DEFAULT_RECURSION_LIMIT = 100
# Default limits for list comprehensions (prevent combinatorial explosion)
DEFAULT_COMPREHENSION_MAX_SIZE = 100000 # Max elements in result list
DEFAULT_COMPREHENSION_MAX_DEPTH = 4 # Max nesting depth of for clauses
from .values import (
Value, EmitResult, RequireFailure,
int_val, float_val, bool_val, string_val, list_val, dict_val, none_val,
wrap_value, unwrap_value, coerce_numeric,
)
from .context import ExecutionContext, create_context
from .builtins import call_builtin, call_method, get_builtin_registry
from .provenance import Provenance, create_provenance
from ..ast import (
AstNode, Module, Command, FunctionDef, Parameter,
Statement, LetStatement, VarDecl, AssignmentStatement,
RequireStatement, AssertStatement,
EmitStatement, ForStatement, WhileStatement, IfStatement,
ExpressionStatement, ReturnStatement, PassStatement,
Block, PythonBlock, NativeBlock, NativeFunction, ElifBranch,
Expression, Literal, Identifier, BinaryOp, UnaryOp,
FunctionCall, MethodCall, MemberAccess, IndexAccess,
ListLiteral, ListComprehension, RangeExpr, DictLiteral,
ConditionalExpr, IfExpr, MatchExpr, MatchArm, LambdaExpr, PythonExpr,
Pattern, LiteralPattern, IdentifierPattern, WildcardPattern,
)
from ..types import (
Type, ListType, DictType, resolve_type_name,
INT, FLOAT, BOOL, STRING,
)
from ..tokens import SourceSpan, TokenType
[docs]
@dataclass
class ExecutionResult:
"""Result of executing a command."""
success: bool
emit_result: Optional[EmitResult] = None
require_failures: List[RequireFailure] = field(default_factory=list)
provenance: Optional[Provenance] = None
error_message: Optional[str] = None
@property
def geometry(self) -> Any:
"""Get the emitted geometry data."""
if self.emit_result:
return self.emit_result.data
return None
@property
def metadata(self) -> Dict[str, Any]:
"""Get the emit metadata."""
if self.emit_result:
return self.emit_result.metadata
return {}
[docs]
class Interpreter:
"""
Tree-walking interpreter for DSL execution.
Evaluates AST nodes by dispatching to type-specific methods.
"""
def __init__(
self,
transforms: List[Callable[[Module], Module]] = None,
recursion_limit: Optional[int] = None,
):
"""
Initialize the interpreter.
Args:
transforms: Optional list of AST transformations to apply before execution
recursion_limit: Maximum depth for command-to-command calls (default 100).
Can also be set via YAPCAD_DSL_RECURSION_LIMIT env var.
"""
self.transforms = transforms or []
self.native_functions: Dict[str, Callable] = {} # Functions from native blocks
self.current_module: Optional[Module] = None # Current module being executed
# Set recursion limit: CLI arg > env var > default
if recursion_limit is not None:
self.recursion_limit = recursion_limit
else:
env_limit = os.environ.get("YAPCAD_DSL_RECURSION_LIMIT")
if env_limit is not None:
try:
self.recursion_limit = int(env_limit)
except ValueError:
self.recursion_limit = DEFAULT_RECURSION_LIMIT
else:
self.recursion_limit = DEFAULT_RECURSION_LIMIT
self.call_depth = 0 # Current command call depth
[docs]
def execute(
self,
module: Module,
command_name: str,
parameters: Dict[str, Any],
source: str = "",
) -> ExecutionResult:
"""
Execute a command from a module.
Args:
module: The parsed and type-checked module
command_name: Name of the command to execute
parameters: Parameter values (raw Python values, not wrapped)
source: Original source code for error messages
Returns:
ExecutionResult with geometry and provenance
"""
# Apply optional AST transforms
transformed_module = module
for transform in self.transforms:
transformed_module = transform(transformed_module)
# Store the module for command lookups during execution
self.current_module = transformed_module
# Execute native blocks to register their exported functions
for native_block in transformed_module.native_blocks:
try:
self._execute_native_block(native_block)
except Exception as e:
return ExecutionResult(
success=False,
error_message=f"Error in native block: {e}"
)
# Find the command
command = None
for cmd in transformed_module.commands:
if cmd.name == command_name:
command = cmd
break
if command is None:
return ExecutionResult(
success=False,
error_message=f"Command '{command_name}' not found in module '{module.name}'"
)
# Wrap parameters as Values
wrapped_params = self._wrap_parameters(command, parameters)
# Create execution context
ctx = create_context(
module_name=module.name,
command_name=command_name,
parameters=wrapped_params,
source=source,
)
# Execute the command body
try:
self._execute_command(command, ctx)
except RuntimeError as e:
return ExecutionResult(
success=False,
error_message=str(e),
require_failures=ctx.require_failures,
)
# Build provenance
provenance = create_provenance(
module_name=module.name,
command_name=command_name,
parameters=parameters,
source=source,
)
# Check for require failures
if ctx.require_failures:
return ExecutionResult(
success=False,
emit_result=ctx.emit_result,
require_failures=ctx.require_failures,
provenance=provenance,
)
# Check for emit
if ctx.emit_result is None:
return ExecutionResult(
success=False,
error_message="Command did not emit any geometry",
provenance=provenance,
)
return ExecutionResult(
success=True,
emit_result=ctx.emit_result,
provenance=provenance,
)
def _wrap_parameters(self, command: Command, parameters: Dict[str, Any]) -> Dict[str, Value]:
"""Wrap raw parameter values as DSL Values."""
wrapped = {}
# Create a temporary context for evaluating default values
from .context import ExecutionContext
temp_ctx = ExecutionContext()
for param in command.parameters:
if param.name in parameters:
raw_value = parameters[param.name]
# Handle optional type annotations (new Pythonic syntax)
if param.type_annotation is not None and hasattr(param.type_annotation, 'name'):
param_type = resolve_type_name(param.type_annotation.name)
else:
# Infer type from the raw value
if isinstance(raw_value, bool):
param_type = BOOL
elif isinstance(raw_value, int):
param_type = INT
elif isinstance(raw_value, float):
param_type = FLOAT
elif isinstance(raw_value, str):
param_type = STRING
else:
param_type = FLOAT # Default fallback
wrapped[param.name] = wrap_value(raw_value, param_type)
elif param.default_value is not None:
# Evaluate default value expression using temporary context
default_val = self._evaluate(param.default_value, temp_ctx)
wrapped[param.name] = default_val
return wrapped
def _execute_command(self, command: Command, ctx: ExecutionContext) -> None:
"""Execute a command's body statements."""
for stmt in command.body.statements:
self._execute_statement(stmt, ctx)
if ctx.should_return:
break
def _execute_statement(self, stmt: Statement, ctx: ExecutionContext) -> None:
"""Execute a statement."""
if isinstance(stmt, LetStatement): # Also handles VarDecl (alias)
self._execute_let(stmt, ctx)
elif isinstance(stmt, AssignmentStatement):
self._execute_assignment(stmt, ctx)
elif isinstance(stmt, RequireStatement): # Also handles AssertStatement (alias)
self._execute_require(stmt, ctx)
elif isinstance(stmt, EmitStatement):
self._execute_emit(stmt, ctx)
elif isinstance(stmt, ForStatement):
self._execute_for(stmt, ctx)
# NOTE: WhileStatement removed - while loops not supported for static verifiability
elif isinstance(stmt, IfStatement):
self._execute_if_statement(stmt, ctx)
elif isinstance(stmt, PassStatement):
pass # PassStatement does nothing
elif isinstance(stmt, ExpressionStatement):
self._evaluate(stmt.expression, ctx)
elif isinstance(stmt, ReturnStatement):
self._execute_return(stmt, ctx)
elif isinstance(stmt, Block):
self._execute_block(stmt, ctx)
elif isinstance(stmt, PythonBlock):
self._execute_python_block(stmt, ctx)
else:
raise RuntimeError(f"Unknown statement type: {type(stmt).__name__}")
def _execute_let(self, stmt: LetStatement, ctx: ExecutionContext) -> None:
"""Execute a let statement."""
value = self._evaluate(stmt.initializer, ctx)
ctx.set_variable(stmt.name, value)
def _execute_assignment(self, stmt: AssignmentStatement, ctx: ExecutionContext) -> None:
"""Execute an assignment statement."""
value = self._evaluate(stmt.value, ctx)
if not ctx.update_variable(stmt.target.name, value):
ctx.add_error(f"Cannot assign to undefined variable '{stmt.target.name}'", stmt.span)
def _execute_require(self, stmt: RequireStatement, ctx: ExecutionContext) -> None:
"""Execute a require statement."""
condition = self._evaluate(stmt.condition, ctx)
if not condition.is_truthy():
message = stmt.message if stmt.message else "Constraint violated"
ctx.add_require_failure(message)
def _execute_emit(self, stmt: EmitStatement, ctx: ExecutionContext) -> None:
"""Execute an emit statement."""
value = self._evaluate(stmt.value, ctx)
metadata = {}
if stmt.metadata:
# Handle both old DictLiteral syntax and new dict metadata
if isinstance(stmt.metadata, DictLiteral):
for key, expr in stmt.metadata.entries.items():
metadata[key] = self._evaluate(expr, ctx).data
elif isinstance(stmt.metadata, dict):
# New syntax: emit value, name="x", material="y"
for key, expr in stmt.metadata.items():
metadata[key] = self._evaluate(expr, ctx).data
ctx.set_emit(value, metadata)
def _execute_for(self, stmt: ForStatement, ctx: ExecutionContext) -> None:
"""Execute a for loop."""
iterable = self._evaluate(stmt.iterable, ctx)
# Handle different iterable types
if isinstance(iterable.type, ListType):
items = iterable.data
elif hasattr(iterable.data, '__iter__'):
items = list(iterable.data)
else:
raise RuntimeError(f"Cannot iterate over {iterable.type}")
# Determine element type
if isinstance(iterable.type, ListType):
elem_type = iterable.type.element_type
else:
elem_type = INT # Default for ranges
with ctx.new_scope("for-loop"):
for item in items:
ctx.set_variable(stmt.variable, wrap_value(item, elem_type))
for body_stmt in stmt.body.statements:
self._execute_statement(body_stmt, ctx)
if ctx.should_return:
return
# NOTE: _execute_while removed - while loops not supported for static verifiability
def _execute_if_statement(self, stmt: IfStatement, ctx: ExecutionContext) -> None:
"""Execute a block-level if statement."""
condition = self._evaluate(stmt.condition, ctx)
if condition.is_truthy():
# Execute then branch
with ctx.new_scope("if-then"):
for body_stmt in stmt.then_branch.statements:
self._execute_statement(body_stmt, ctx)
if ctx.should_return:
return
else:
# Check elif branches
executed = False
for elif_branch in stmt.elif_branches:
elif_cond = self._evaluate(elif_branch.condition, ctx)
if elif_cond.is_truthy():
with ctx.new_scope("elif"):
for body_stmt in elif_branch.body.statements:
self._execute_statement(body_stmt, ctx)
if ctx.should_return:
return
executed = True
break
# Execute else branch if no elif matched
if not executed and stmt.else_branch is not None:
with ctx.new_scope("else"):
for body_stmt in stmt.else_branch.statements:
self._execute_statement(body_stmt, ctx)
if ctx.should_return:
return
def _execute_return(self, stmt: ReturnStatement, ctx: ExecutionContext) -> None:
"""Execute a return statement."""
if stmt.value:
value = self._evaluate(stmt.value, ctx)
ctx.signal_return(value)
else:
ctx.signal_return(none_val(INT)) # Return unit/none
def _execute_block(self, block: Block, ctx: ExecutionContext) -> None:
"""Execute a block of statements."""
with ctx.new_scope("block"):
for stmt in block.statements:
self._execute_statement(stmt, ctx)
if ctx.should_return:
break
def _execute_python_block(self, block: PythonBlock, ctx: ExecutionContext) -> None:
"""Execute an embedded Python block."""
# Build the execution namespace with current variables
namespace = {}
scope = ctx.current_scope
while scope:
for name, value in scope.variables.items():
namespace[name] = value.data
scope = scope.parent
# Add yapCAD imports
try:
exec("from yapcad.geom import *", namespace)
exec("from yapcad.geom3d import *", namespace)
except ImportError:
pass
# Execute the Python code
try:
exec(block.code, namespace)
except Exception as e:
raise RuntimeError(f"Python block error: {e}")
# Check for return value (indicated by 'return' in code)
# The DSL requires explicit `return <value> as <type>` syntax
# For now, we extract any variable named '_return_value' and '_return_type'
if '_return_value' in namespace and '_return_type' in namespace:
return_type = resolve_type_name(namespace['_return_type'])
ctx.set_variable(block.result_var or '_python_result',
wrap_value(namespace['_return_value'], return_type))
def _execute_native_block(self, block: NativeBlock) -> None:
"""Execute a native Python block and register exported functions.
The native block's Python code is executed in a namespace with yapCAD
imports available. Functions declared in the exports section are then
extracted from the namespace and stored for later use by DSL commands.
"""
# Create namespace with yapCAD imports
namespace = {}
try:
exec("from yapcad.geom import *", namespace)
exec("from yapcad.geom3d import *", namespace)
except ImportError:
pass
# Also import commonly used modules
try:
exec("import math", namespace)
exec("from math import pi, sin, cos, tan, sqrt, atan2", namespace)
except ImportError:
pass
# Execute the Python code
try:
exec(block.code, namespace)
except Exception as e:
raise RuntimeError(f"Native block execution error: {e}")
# Extract exported functions from namespace
for func_decl in block.exports:
func_name = func_decl.name
if func_name not in namespace:
raise RuntimeError(
f"Native block exports '{func_name}' but it was not defined in the Python code"
)
func = namespace[func_name]
if not callable(func):
raise RuntimeError(
f"Native block exports '{func_name}' but it is not callable"
)
# Store the function with its declared return type
self.native_functions[func_name] = func
def _evaluate(self, expr: Expression, ctx: ExecutionContext) -> Value:
"""Evaluate an expression to produce a Value."""
if isinstance(expr, Literal):
return self._eval_literal(expr)
elif isinstance(expr, Identifier):
return self._eval_identifier(expr, ctx)
elif isinstance(expr, BinaryOp):
return self._eval_binary_op(expr, ctx)
elif isinstance(expr, UnaryOp):
return self._eval_unary_op(expr, ctx)
elif isinstance(expr, FunctionCall):
return self._eval_function_call(expr, ctx)
elif isinstance(expr, MethodCall):
return self._eval_method_call(expr, ctx)
elif isinstance(expr, MemberAccess):
return self._eval_member_access(expr, ctx)
elif isinstance(expr, IndexAccess):
return self._eval_index_access(expr, ctx)
elif isinstance(expr, ListLiteral):
return self._eval_list_literal(expr, ctx)
elif isinstance(expr, ListComprehension):
return self._eval_list_comprehension(expr, ctx)
elif isinstance(expr, RangeExpr):
return self._eval_range(expr, ctx)
elif isinstance(expr, DictLiteral):
return self._eval_dict_literal(expr, ctx)
elif isinstance(expr, ConditionalExpr):
return self._eval_conditional_expr(expr, ctx)
elif isinstance(expr, IfExpr):
return self._eval_if_expr(expr, ctx)
elif isinstance(expr, MatchExpr):
return self._eval_match_expr(expr, ctx)
elif isinstance(expr, LambdaExpr):
return self._eval_lambda(expr, ctx)
elif isinstance(expr, PythonExpr):
return self._eval_python_expr(expr, ctx)
else:
raise RuntimeError(f"Unknown expression type: {type(expr).__name__}")
def _eval_literal(self, lit: Literal) -> Value:
"""Evaluate a literal value."""
if lit.literal_type == TokenType.INT_LITERAL:
return int_val(int(lit.value))
elif lit.literal_type == TokenType.FLOAT_LITERAL:
return float_val(float(lit.value))
elif lit.literal_type == TokenType.BOOL_LITERAL:
# Value is already a bool from the parser
if isinstance(lit.value, bool):
return bool_val(lit.value)
return bool_val(str(lit.value).lower() == "true")
elif lit.literal_type == TokenType.STRING_LITERAL:
return string_val(lit.value)
else:
raise RuntimeError(f"Unknown literal type: {lit.literal_type}")
def _eval_identifier(self, ident: Identifier, ctx: ExecutionContext) -> Value:
"""Evaluate an identifier (variable lookup)."""
value = ctx.get_variable(ident.name)
if value is None:
raise RuntimeError(f"Undefined variable: {ident.name}")
return value
def _eval_binary_op(self, op: BinaryOp, ctx: ExecutionContext) -> Value:
"""Evaluate a binary operation."""
left = self._evaluate(op.left, ctx)
right = self._evaluate(op.right, ctx)
# Short-circuit for logical operators
if op.operator == TokenType.AND:
if not left.is_truthy():
return bool_val(False)
return bool_val(right.is_truthy())
elif op.operator == TokenType.OR:
if left.is_truthy():
return bool_val(True)
return bool_val(right.is_truthy())
# Arithmetic operators
if op.operator == TokenType.PLUS:
if left.type == STRING or right.type == STRING:
return string_val(str(left.data) + str(right.data))
# List concatenation
if isinstance(left.type, ListType) and isinstance(right.type, ListType):
combined = left.data + right.data
# Use the element type from the ListType, not from the data
# Create Value directly since combined contains raw data (not Value objects)
elem_type = left.type.element_type
return Value(combined, ListType(elem_type))
result = left.data + right.data
if left.type == INT and right.type == INT:
return int_val(result)
return float_val(result)
elif op.operator == TokenType.MINUS:
result = left.data - right.data
if left.type == INT and right.type == INT:
return int_val(result)
return float_val(result)
elif op.operator == TokenType.STAR:
result = left.data * right.data
if left.type == INT and right.type == INT:
return int_val(result)
return float_val(result)
elif op.operator == TokenType.SLASH:
result = left.data / right.data
return float_val(result)
elif op.operator == TokenType.PERCENT:
result = left.data % right.data
if left.type == INT and right.type == INT:
return int_val(result)
return float_val(result)
elif op.operator == TokenType.DOUBLE_SLASH:
# Integer division
result = left.data // right.data
return int_val(int(result))
elif op.operator == TokenType.DOUBLE_STAR:
# Power operator
result = left.data ** right.data
if left.type == INT and right.type == INT and right.data >= 0:
return int_val(int(result))
return float_val(result)
# Comparison operators
elif op.operator == TokenType.EQ:
return bool_val(left.data == right.data)
elif op.operator == TokenType.NE:
return bool_val(left.data != right.data)
elif op.operator == TokenType.LT:
return bool_val(left.data < right.data)
elif op.operator == TokenType.GT:
return bool_val(left.data > right.data)
elif op.operator == TokenType.LE:
return bool_val(left.data <= right.data)
elif op.operator == TokenType.GE:
return bool_val(left.data >= right.data)
else:
raise RuntimeError(f"Unknown binary operator: {op.operator}")
def _eval_unary_op(self, op: UnaryOp, ctx: ExecutionContext) -> Value:
"""Evaluate a unary operation."""
operand = self._evaluate(op.operand, ctx)
if op.operator == TokenType.MINUS:
if operand.type == INT:
return int_val(-operand.data)
return float_val(-operand.data)
elif op.operator == TokenType.NOT:
return bool_val(not operand.is_truthy())
else:
raise RuntimeError(f"Unknown unary operator: {op.operator}")
def _eval_function_call(self, call: FunctionCall, ctx: ExecutionContext) -> Value:
"""Evaluate a function call."""
# Evaluate arguments
args = [self._evaluate(arg, ctx) for arg in call.arguments]
# Extract function name from callee (which is an Identifier)
if isinstance(call.callee, Identifier):
func_name = call.callee.name
else:
raise RuntimeError(f"Unsupported callee type: {type(call.callee).__name__}")
# Check for native function first
if func_name in self.native_functions:
return self._call_native_function(func_name, args)
# Check for commands in the current module
if self.current_module is not None:
for cmd in self.current_module.commands:
if cmd.name == func_name:
return self._call_command(cmd, args, ctx)
# Call the built-in function
return call_builtin(func_name, args)
def _call_command(self, command: Command, args: List[Value], parent_ctx: ExecutionContext) -> Value:
"""Call a command as a function from within another command."""
# Check recursion depth limit
self.call_depth += 1
if self.call_depth > self.recursion_limit:
self.call_depth -= 1
raise RuntimeError(
f"Maximum recursion depth ({self.recursion_limit}) exceeded calling '{command.name}'. "
f"Increase limit via --recursion-limit or YAPCAD_DSL_RECURSION_LIMIT env var."
)
try:
# Build parameters dict from args
param_dict = {}
# Count required parameters (those without defaults)
required_count = sum(1 for p in command.parameters if p.default_value is None)
if len(args) < required_count or len(args) > len(command.parameters):
if required_count == len(command.parameters):
raise RuntimeError(
f"Command '{command.name}' expects {len(command.parameters)} arguments, got {len(args)}"
)
else:
raise RuntimeError(
f"Command '{command.name}' expects {required_count}-{len(command.parameters)} arguments, got {len(args)}"
)
# Assign provided arguments
for param, arg in zip(command.parameters, args):
param_dict[param.name] = arg
# Fill in default values for missing parameters
for i in range(len(args), len(command.parameters)):
param = command.parameters[i]
if param.default_value is not None:
default_val = self._evaluate(param.default_value, parent_ctx)
param_dict[param.name] = default_val
# Create a new context for the command execution
module_name = self.current_module.name if self.current_module else "unknown"
cmd_ctx = create_context(module_name, command.name, param_dict, "")
# Execute the command body
self._execute_command(command, cmd_ctx)
# Return the emitted value
if cmd_ctx.emit_result is not None:
return cmd_ctx.emit_result
else:
raise RuntimeError(f"Command '{command.name}' did not emit a value")
finally:
self.call_depth -= 1
def _call_native_function(self, func_name: str, args: List[Value]) -> Value:
"""Call a native function with the given arguments."""
func = self.native_functions[func_name]
# Unwrap Value objects to raw Python values
raw_args = [arg.data for arg in args]
try:
result = func(*raw_args)
except Exception as e:
raise RuntimeError(f"Error calling native function '{func_name}': {e}")
# Wrap the result - infer type from the result
# For solids, we use SOLID type
from ..types import SOLID
try:
from yapcad.geom3d import issolid
if issolid(result):
return wrap_value(result, SOLID)
except ImportError:
pass
# Default to FLOAT for numeric, or wrap as-is
if isinstance(result, (int, float)):
return float_val(float(result))
elif isinstance(result, bool):
return bool_val(result)
elif isinstance(result, str):
return string_val(result)
elif isinstance(result, list):
return list_val([wrap_value(r, FLOAT) for r in result], FLOAT)
else:
# Default: wrap as SOLID since most native functions return geometry
return wrap_value(result, SOLID)
def _eval_method_call(self, call: MethodCall, ctx: ExecutionContext) -> Value:
"""Evaluate a method call."""
receiver = self._evaluate(call.object, ctx)
args = [self._evaluate(arg, ctx) for arg in call.arguments]
# Get the type name for method lookup
type_name = receiver.type.name if hasattr(receiver.type, 'name') else str(receiver.type)
return call_method(type_name, call.method, receiver, args)
def _eval_member_access(self, access: MemberAccess, ctx: ExecutionContext) -> Value:
"""Evaluate member access (e.g., point.x)."""
obj = self._evaluate(access.object, ctx)
# Handle dict access
if isinstance(obj.type, DictType):
return wrap_value(obj.data.get(access.member), STRING)
# Handle named tuple fields (common in yapCAD)
if hasattr(obj.data, access.member):
return wrap_value(getattr(obj.data, access.member), FLOAT)
# Handle list-based points (index by x=0, y=1, z=2)
if access.member == 'x' and hasattr(obj.data, '__getitem__'):
return float_val(obj.data[0])
elif access.member == 'y' and hasattr(obj.data, '__getitem__'):
return float_val(obj.data[1])
elif access.member == 'z' and hasattr(obj.data, '__getitem__'):
return float_val(obj.data[2])
raise RuntimeError(f"Unknown member: {access.member}")
def _eval_index_access(self, access: IndexAccess, ctx: ExecutionContext) -> Value:
"""Evaluate index access (e.g., list[0])."""
obj = self._evaluate(access.object, ctx)
index = self._evaluate(access.index, ctx)
if isinstance(obj.type, ListType):
elem_type = obj.type.element_type
return wrap_value(obj.data[int(index.data)], elem_type)
else:
return wrap_value(obj.data[int(index.data)], FLOAT)
def _eval_list_literal(self, lst: ListLiteral, ctx: ExecutionContext) -> Value:
"""Evaluate a list literal."""
if not lst.elements:
return list_val([], INT) # Empty list, default to int
values = [self._evaluate(elem, ctx) for elem in lst.elements]
elem_type = values[0].type if values else INT
return list_val(values, elem_type)
def _eval_list_comprehension(self, comp: ListComprehension, ctx: ExecutionContext) -> Value:
"""Evaluate a list comprehension with one or more for clauses.
Supports:
[expr for x in xs]
[expr for x in xs if cond]
[expr for x in xs for y in ys]
[expr for x in xs if c1 for y in ys if c2]
Multiple for clauses are evaluated as nested loops (left = outer).
Resource Limits:
- Max nesting depth: DEFAULT_COMPREHENSION_MAX_DEPTH (4 levels)
- Max result size: DEFAULT_COMPREHENSION_MAX_SIZE (100,000 elements)
"""
# Check nesting depth limit
num_clauses = len(comp.clauses)
if num_clauses > DEFAULT_COMPREHENSION_MAX_DEPTH:
raise RuntimeError(
f"List comprehension nesting depth ({num_clauses}) exceeds maximum "
f"({DEFAULT_COMPREHENSION_MAX_DEPTH}). Simplify the comprehension or "
f"use nested for loops."
)
results: list = []
with ctx.new_scope("comprehension"):
self._eval_comprehension_clauses(comp.clauses, comp.element_expr, ctx, results)
if results:
return list_val(results, results[0].type)
return list_val([], INT)
def _eval_comprehension_clauses(
self,
clauses: list,
element_expr,
ctx: ExecutionContext,
results: list
) -> None:
"""Recursively evaluate comprehension clauses.
Each clause defines a loop. Multiple clauses become nested loops.
Enforces a maximum result size to prevent combinatorial explosion.
"""
if not clauses:
# Check size limit before adding
if len(results) >= DEFAULT_COMPREHENSION_MAX_SIZE:
raise RuntimeError(
f"List comprehension result size exceeds maximum "
f"({DEFAULT_COMPREHENSION_MAX_SIZE} elements). "
f"Consider using a filter or processing in batches."
)
# Base case: all clauses processed, evaluate and collect element
value = self._evaluate(element_expr, ctx)
results.append(value)
return
# Process first clause
clause = clauses[0]
remaining_clauses = clauses[1:]
# Evaluate the iterable
iterable = self._evaluate(clause.iterable, ctx)
# Iterate
for item in iterable.data:
# Early exit if we've hit the size limit
if len(results) >= DEFAULT_COMPREHENSION_MAX_SIZE:
raise RuntimeError(
f"List comprehension result size exceeds maximum "
f"({DEFAULT_COMPREHENSION_MAX_SIZE} elements). "
f"Consider using a filter or processing in batches."
)
# Determine element type
if isinstance(iterable.type, ListType):
elem_type = iterable.type.element_type
else:
elem_type = INT
# Bind loop variable
ctx.set_variable(clause.variable, wrap_value(item, elem_type))
# Check all conditions for this clause
skip = False
for condition in clause.conditions:
cond = self._evaluate(condition, ctx)
if not cond.is_truthy():
skip = True
break
if skip:
continue
# Recurse to inner clauses (or evaluate element if no more clauses)
self._eval_comprehension_clauses(remaining_clauses, element_expr, ctx, results)
def _eval_range(self, range_expr: RangeExpr, ctx: ExecutionContext) -> Value:
"""Evaluate a range expression."""
start = self._evaluate(range_expr.start, ctx)
end = self._evaluate(range_expr.end, ctx)
# Create a list of integers
values = [int_val(i) for i in range(int(start.data), int(end.data))]
return list_val(values, INT)
def _eval_dict_literal(self, dct: DictLiteral, ctx: ExecutionContext) -> Value:
"""Evaluate a dict literal."""
result = {}
for key, value_expr in dct.entries.items():
result[key] = self._evaluate(value_expr, ctx)
return dict_val(result)
def _eval_conditional_expr(self, expr: ConditionalExpr, ctx: ExecutionContext) -> Value:
"""Evaluate a ternary conditional expression (e.g., x if cond else y).
Short-circuits: only evaluates the selected branch.
"""
condition = self._evaluate(expr.condition, ctx)
if condition.is_truthy():
return self._evaluate(expr.true_branch, ctx)
else:
return self._evaluate(expr.false_branch, ctx)
def _eval_if_expr(self, if_expr: IfExpr, ctx: ExecutionContext) -> Value:
"""Evaluate an if expression."""
condition = self._evaluate(if_expr.condition, ctx)
if condition.is_truthy():
return self._evaluate(if_expr.then_branch, ctx)
elif if_expr.else_branch:
return self._evaluate(if_expr.else_branch, ctx)
else:
return none_val(INT)
def _eval_match_expr(self, match: MatchExpr, ctx: ExecutionContext) -> Value:
"""Evaluate a match expression."""
subject = self._evaluate(match.subject, ctx)
for arm in match.arms:
if self._pattern_matches(arm.pattern, subject, ctx):
return self._evaluate(arm.body, ctx)
raise RuntimeError("No match arm matched")
def _pattern_matches(self, pattern: Pattern, value: Value, ctx: ExecutionContext) -> bool:
"""Check if a pattern matches a value, binding variables if needed."""
if isinstance(pattern, WildcardPattern):
return True
elif isinstance(pattern, LiteralPattern):
# pattern.value is a Literal node, so we need pattern.value.value
return value.data == pattern.value.value
elif isinstance(pattern, IdentifierPattern):
ctx.set_variable(pattern.name, value)
return True
else:
return False
def _eval_lambda(self, lam: LambdaExpr, ctx: ExecutionContext) -> Value:
"""Evaluate a lambda expression (returns a callable)."""
# Capture the current scope for closure
captured_scope = ctx.current_scope
def lambda_impl(*args: Value) -> Value:
with ctx.new_scope("lambda"):
# Bind parameters
for i, param in enumerate(lam.parameters):
if i < len(args):
ctx.set_variable(param, args[i])
# Evaluate body
return self._evaluate(lam.body, ctx)
# Return as a special callable value
from ..types import FunctionType
return wrap_value(lambda_impl, FunctionType([], FLOAT)) # Simplified type
def _eval_python_expr(self, expr: PythonExpr, ctx: ExecutionContext) -> Value:
"""Evaluate an inline Python expression."""
# Build namespace from context
namespace = {}
scope = ctx.current_scope
while scope:
for name, value in scope.variables.items():
namespace[name] = value.data
scope = scope.parent
# Evaluate the expression
result = eval(expr.code, namespace)
# Wrap result - need type annotation
# expr.return_type is a TypeNode, get its name
if expr.return_type and hasattr(expr.return_type, 'name'):
result_type = resolve_type_name(expr.return_type.name)
else:
result_type = FLOAT
return wrap_value(result, result_type)
# Convenience function for simple execution
[docs]
def execute(
module: Module,
command_name: str,
parameters: Dict[str, Any],
source: str = "",
) -> ExecutionResult:
"""
Execute a command from a module.
This is a convenience wrapper around Interpreter.execute().
"""
interpreter = Interpreter()
return interpreter.execute(module, command_name, parameters, source)
[docs]
def compile_and_run(
source: str,
command_name: str,
parameters: Dict[str, Any],
recursion_limit: Optional[int] = None,
) -> ExecutionResult:
"""
High-level API to compile and run DSL source code in one call.
This is the simplest way to execute DSL code:
from yapcad.dsl import compile_and_run
result = compile_and_run('''
module my_design;
command MAKE_BOX(w: float, h: float, d: float) -> solid {
emit box(w, h, d);
}
''', "MAKE_BOX", {"w": 10.0, "h": 20.0, "d": 5.0})
if result.success:
geometry = result.geometry
else:
print(f"Error: {result.error_message}")
Args:
source: DSL source code as a string
command_name: Name of the command to execute
parameters: Parameter values (raw Python values)
recursion_limit: Maximum depth for command-to-command calls (default 100,
can also be set via YAPCAD_DSL_RECURSION_LIMIT env var)
Returns:
ExecutionResult with geometry, provenance, and any errors
"""
# Import lexer, parser, checker
from ..lexer import tokenize
from ..parser import parse
from ..checker import check
# Tokenize
try:
tokens = tokenize(source)
except Exception as e:
return ExecutionResult(
success=False,
error_message=f"Lexer error: {e}",
)
# Parse (pass source for native block extraction)
try:
module = parse(tokens, source=source)
except Exception as e:
return ExecutionResult(
success=False,
error_message=f"Parser error: {e}",
)
# Type check
check_result = check(module)
if check_result.has_errors:
error_msgs = [str(d) for d in check_result.diagnostics if d.severity.name == 'ERROR']
return ExecutionResult(
success=False,
error_message=f"Type errors: {'; '.join(error_msgs)}",
)
# Execute
interpreter = Interpreter(recursion_limit=recursion_limit)
return interpreter.execute(module, command_name, parameters, source)