Source code for yapcad.package.analysis.calculix

"""CalculiX backend for yapCAD analysis plans."""

from __future__ import annotations

import math
import shutil
import subprocess
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple

from ..core import PackageManifest
from .base import AnalysisAdapter, AnalysisPlan, AnalysisResult, register_backend


def _now() -> str:
    return datetime.now(timezone.utc).isoformat()


@dataclass
class _MeshConfig:
    inner_radius: float
    outer_radius: float
    thickness: float
    radial_divisions: int
    thickness_divisions: int
    youngs_modulus: float
    poisson_ratio: float
    density: float
    thrust_force: float


[docs] class CalculixAdapter(AnalysisAdapter): """Create a simplified axisymmetric disk model and execute CalculiX when available.""" name = "calculix" _DEFAULT_E = 68.9e3 # MPa for 6061-T6 _DEFAULT_NU = 0.33 _DEFAULT_DENSITY = 2.70e-6 # tonne/mm^3 (~2700 kg/m^3)
[docs] def run( self, manifest: PackageManifest, plan: AnalysisPlan, workspace: Path, **_: Any, ) -> AnalysisResult: workspace.mkdir(parents=True, exist_ok=True) mesh = self._mesh_config(plan) inp_basename = plan.plan_id or "calculix_job" inp_path = workspace / f"{inp_basename}.inp" self._write_input_file(inp_path, mesh) command = plan.execution.command or "ccx" executable = shutil.which(command) summary: Dict[str, Any] = { "plan": plan.plan_id, "backend": plan.backend, "timestamp": _now(), "execution": { "mode": plan.execution.mode, "command": command, }, "mesh": { "inner_radius_mm": mesh.inner_radius, "outer_radius_mm": mesh.outer_radius, "thickness_mm": mesh.thickness, "radial_divisions": mesh.radial_divisions, "thickness_divisions": mesh.thickness_divisions, }, } artifacts = [ {"kind": "solver-input", "path": inp_path.name}, ] if executable is None: summary["statusDetail"] = f"Executable '{command}' not found on PATH" return AnalysisResult( plan_id=plan.plan_id, status="skipped", backend=plan.backend, summary=summary, artifacts=artifacts, ) run = subprocess.run( [executable, inp_basename], cwd=workspace, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) summary.setdefault("solver", {}) summary["solver"].update( { "returncode": run.returncode, "stdout_tail": run.stdout[-2000:], "stderr_tail": run.stderr[-2000:], } ) if run.returncode != 0: summary["statusDetail"] = "CalculiX execution failed" return AnalysisResult( plan_id=plan.plan_id, status="error", backend=plan.backend, summary=summary, artifacts=artifacts, ) dat_path = workspace / f"{inp_basename}.dat" max_disp = self._parse_max_displacement(dat_path) metrics: Dict[str, float] = {} if max_disp is not None: metrics["displacement.max"] = max_disp metrics["displacement.max_mm"] = max_disp status = self._evaluate_acceptance(metrics, plan.acceptance) summary["metrics"] = metrics if status == "passed": summary["statusDetail"] = "Acceptance criteria satisfied" elif status == "failed": summary["statusDetail"] = "Acceptance criteria violated" else: summary.setdefault("statusDetail", "Acceptance evaluation incomplete") for suffix in (".dat", ".frd", ".sta", ".log"): candidate = workspace / f"{inp_basename}{suffix}" if candidate.exists(): artifacts.append({"kind": f"ccx{suffix}", "path": candidate.name}) return AnalysisResult( plan_id=plan.plan_id, status=status, backend=plan.backend, metrics=metrics, summary=summary, artifacts=artifacts, )
# ------------------------------------------------------------------ # Mesh helpers def _mesh_config(self, plan: AnalysisPlan) -> _MeshConfig: opts = dict(plan.backend_options) geom = dict(plan.geometry) inner = float(opts.get("inner_radius_mm") or geom.get("inner_radius_mm") or 0.0) outer = float(opts.get("outer_radius_mm") or geom.get("outer_radius_mm") or (inner + 1.0)) thickness = float(opts.get("thickness_mm") or geom.get("thickness_mm") or 10.0) thrust = float(opts.get("thrust_n") or opts.get("thrust_force_n") or 2224.0) radial = int(opts.get("radial_divisions", 24)) axial = int(opts.get("thickness_divisions", 2)) youngs = float(opts.get("youngs_modulus_mpa", self._DEFAULT_E)) poisson = float(opts.get("poisson_ratio", self._DEFAULT_NU)) density = float(opts.get("density_tonemm3", self._DEFAULT_DENSITY)) return _MeshConfig( inner_radius=inner, outer_radius=outer, thickness=thickness, radial_divisions=radial, thickness_divisions=axial, youngs_modulus=youngs, poisson_ratio=poisson, density=density, thrust_force=thrust, ) def _write_input_file(self, path: Path, mesh: _MeshConfig) -> None: nodes, elements, inner_nodes = self._generate_mesh(mesh) def fmt(num: float) -> str: return f"{num:.6g}" with path.open("w", encoding="utf-8") as fp: fp.write("*HEADING\n") fp.write("yapCAD bulkhead axisymmetric approximation\n") fp.write("*NODE\n") for nid, (r, z) in enumerate(nodes, start=1): fp.write(f"{nid}, {fmt(r)}, {fmt(z)}\n") fp.write("*ELEMENT, TYPE=CAX4\n") for eid, conn in enumerate(elements, start=1): fp.write(f"{eid}, {', '.join(str(nid) for nid in conn)}\n") fp.write("*ELSET, ELSET=EALL, GENERATE\n1, {0}\n".format(len(elements))) fp.write("*SOLID SECTION, ELSET=EALL, MATERIAL=AL6061\n1.0\n") fp.write("*MATERIAL, NAME=AL6061\n") fp.write("*ELASTIC\n") fp.write(f"{mesh.youngs_modulus}, {mesh.poisson_ratio}\n") fp.write("*DENSITY\n") fp.write(f"{mesh.density}\n") outer_nodes = [nid for nid, (r, _) in enumerate(nodes, start=1) if math.isclose(r, mesh.outer_radius, rel_tol=1e-6)] fp.write("*NSET, NSET=N_OUTER\n") fp.write(", ".join(str(n) for n in outer_nodes) + "\n") fp.write("*NSET, NSET=N_INNER\n") fp.write(", ".join(str(n) for n in inner_nodes) + "\n") fp.write("*BOUNDARY\n") fp.write("N_OUTER, 1, 2\n") fp.write("*STEP\n") fp.write("*STATIC\n") if inner_nodes: load_per_node = -(mesh.thrust_force / len(inner_nodes)) fp.write("*CLOAD\n") for nid in inner_nodes: fp.write(f"{nid}, 2, {fmt(load_per_node)}\n") fp.write("*NODE PRINT, NSET=N_INNER\n") fp.write("U\n") fp.write("*NODE PRINT, NSET=N_OUTER\n") fp.write("U\n") fp.write("*END STEP\n") def _generate_mesh( self, mesh: _MeshConfig ) -> Tuple[List[Tuple[float, float]], List[Tuple[int, int, int, int]], List[int]]: nodes: List[Tuple[float, float]] = [] elements: List[Tuple[int, int, int, int]] = [] r_vals = [mesh.inner_radius + (mesh.outer_radius - mesh.inner_radius) * i / mesh.radial_divisions for i in range(mesh.radial_divisions + 1)] z_vals = [mesh.thickness * j / mesh.thickness_divisions for j in range(mesh.thickness_divisions + 1)] node_index: Dict[Tuple[int, int], int] = {} nid = 1 for j, z in enumerate(z_vals): for i, r in enumerate(r_vals): nodes.append((r, z)) node_index[(i, j)] = nid nid += 1 for j in range(mesh.thickness_divisions): for i in range(mesh.radial_divisions): n1 = node_index[(i, j)] n2 = node_index[(i + 1, j)] n3 = node_index[(i + 1, j + 1)] n4 = node_index[(i, j + 1)] elements.append((n1, n2, n3, n4)) inner_nodes = [node_index[(0, j)] for j in range(mesh.thickness_divisions + 1)] return nodes, elements, inner_nodes def _parse_max_displacement(self, dat_path: Path) -> Optional[float]: if not dat_path.exists(): return None max_disp: Optional[float] = None with dat_path.open("r", encoding="utf-8", errors="ignore") as fp: for line in fp: parts = line.strip().replace("=", " ").split() if not parts: continue try: int(parts[0]) except ValueError: continue numeric_parts: List[float] = [] for token in parts[1:]: try: numeric_parts.append(float(token)) except ValueError: continue if not numeric_parts: continue uz = numeric_parts[1] if len(numeric_parts) > 1 else numeric_parts[0] uz_abs = abs(uz) if max_disp is None or uz_abs > max_disp: max_disp = uz_abs return max_disp def _evaluate_acceptance(self, metrics: Dict[str, float], acceptance: Dict[str, Any]) -> str: if not acceptance: return "pending" if not metrics else "passed" status = "passed" for key, rule in acceptance.items(): metric_value = metrics.get(key) if metric_value is None: metric_value = metrics.get(f"{key}_mm") if metric_value is None: status = "pending" continue limit = rule.get("limit") if limit is None and "limit_mm" in rule: limit = rule["limit_mm"] if limit is None: continue limit = float(limit) comparison = rule.get("comparison", "<=") if comparison == "<=" and not (metric_value <= limit): return "failed" if comparison == "<" and not (metric_value < limit): return "failed" if comparison == ">=" and not (metric_value >= limit): return "failed" if comparison == ">" and not (metric_value > limit): return "failed" return status
register_backend("calculix", CalculixAdapter) __all__ = ["CalculixAdapter"]