da84425d25
壓測 401 根因:{{credential.X}} 系統沒實裝,三條 template 展開路徑都不認
credential. namespace → 注入空值 → 目標 API 401(test_arcrun/5 Haiku 實證)。
修法(design §8,richblack 確認方向 B「讓 {{credential.X}} 真的能用」):
- auth_static_key 加 resolve_credentials action:給 names → WASM 內 kv_get +
crypto_decrypt → 回明文 map(不查 recipe、缺則誠實報錯)
- auth-dispatcher 加 resolveCredentialRefs:遞迴偵測 {{credential.X}} → 交 WASM
解密 → 回填(無 ref 則零開銷不打 WASM)
- graph-executor 在 node.data interpolate 後呼叫,不碰 ENCRYPTION_KEY(rule 02 §2.2)
解密全程在 WASM,TS 只偵測+回填。tinygo build OK + tsc 0 + §2.2 自檢綠。
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
710 lines
28 KiB
TypeScript
710 lines
28 KiB
TypeScript
// arcrun 圖遍歷引擎 — 支援完整 Cypher 語意關係
|
||
import type { ExecutionGraph, GraphNode, TraceStep, ComponentRunner, KVContextStore, EdgeType, Bindings } from './types';
|
||
import { kvSetNodeOutput, kvGetNodeOutput, ExecutionError, WorkflowPaused } from './types';
|
||
import { injectCredentials } from './actions/credential-injector';
|
||
import { tryAuthDispatch, resolveCredentialRefs } from './actions/auth-dispatcher';
|
||
import { expandPromptRecipe } from './lib/recipe-expander';
|
||
import { resolveRecipe } from './routes/recipes';
|
||
import { persistPausedRun, isResumablePending, parseRecipeOutput } from './lib/paused-runs';
|
||
import { buildMagicVars } from './lib/magic-vars';
|
||
import { recordTelemetry } from './lib/telemetry';
|
||
|
||
export type ComponentLoader = (componentId: string) => Promise<ComponentRunner>;
|
||
export type WorkflowLoader = (workflowId: string) => Promise<ExecutionGraph>;
|
||
|
||
// Fan-in 狀態:入度 > 1 的節點需要等所有上游完成後才執行
|
||
type FanInState = Map<string, { ctx: Record<string, unknown>; remaining: number }>;
|
||
|
||
export class GraphExecutor {
|
||
private loader: ComponentLoader;
|
||
private workflowLoader?: WorkflowLoader;
|
||
private env?: Bindings;
|
||
private apiKey?: string;
|
||
public recordComponentReference?: (componentId: string, workflowId: string) => Promise<void>;
|
||
|
||
// kbdb-base §7.1+§7.5.h:本次執行用到的 recipe **key**(uuid 優先,舊資料 fallback canonical_id)。
|
||
// 判定單位是「工作流執行」(n8n execution):執行結束後由 executeWebhookGraph 一次性把這組 key
|
||
// 各記成功/失敗到 KBDB 市場星數(per-uuid → 能區分同 canonical 的不同作者版本,§7.5.5)。執行中只收集。
|
||
public readonly usedRecipeKeys = new Set<string>();
|
||
|
||
// resumable workflow(SDD: resumable-workflow/design.md)
|
||
// 暫停時持久化 state 用,需在 execute 進入時設定
|
||
private currentGraph?: ExecutionGraph;
|
||
private currentRunId?: string;
|
||
|
||
constructor(loader: ComponentLoader, workflowLoader?: WorkflowLoader, env?: Bindings, apiKey?: string) {
|
||
this.loader = loader;
|
||
this.workflowLoader = workflowLoader;
|
||
this.env = env;
|
||
this.apiKey = apiKey;
|
||
}
|
||
|
||
async execute(
|
||
graph: ExecutionGraph,
|
||
initialContext: Record<string, unknown>,
|
||
kvNamespace?: KVNamespace | undefined,
|
||
): Promise<{
|
||
data: unknown;
|
||
trace: TraceStep[];
|
||
}> {
|
||
const trace: TraceStep[] = [];
|
||
|
||
// 建立 KV Context Store(BUILD-006)
|
||
// run_id = graphId + timestamp,確保每次執行獨立
|
||
const kvStore: KVContextStore | undefined = kvNamespace
|
||
? { runId: `${graph.id}-${Date.now()}`, kv: kvNamespace }
|
||
: undefined;
|
||
|
||
// resumable workflow:記住當前 graph + run_id 給 pending 暫停用
|
||
this.currentGraph = graph;
|
||
this.currentRunId = kvStore?.runId ?? `${graph.id}-${Date.now()}`;
|
||
|
||
// Magic vars:注入 _today / _now / _iso_week 等系統變數(LI SDD M2.x)
|
||
// initialContext 寫前,magic vars 寫後 → magic vars 永遠 win(防 user accidentally 用 _ prefix)
|
||
// 同時保留 user 既有 ctx,magic vars 不破壞既有 workflow(_ prefix reserved)
|
||
const ctxWithMagic: Record<string, unknown> = {
|
||
...initialContext,
|
||
...buildMagicVars(),
|
||
};
|
||
|
||
// 找出所有起點(沒有任何邊指向的節點)
|
||
const hasIncoming = new Set(graph.edges.map(e => e.to));
|
||
const startNodes = graph.nodes.filter(n => !hasIncoming.has(n.id));
|
||
|
||
if (startNodes.length === 0) {
|
||
return { data: ctxWithMagic, trace };
|
||
}
|
||
|
||
// 建立 fan-in 狀態(入度 > 1 的節點需要等所有上游)
|
||
const fanIn: FanInState = new Map();
|
||
for (const node of graph.nodes) {
|
||
const inDeg = graph.edges.filter(e => e.to === node.id).length;
|
||
if (inDeg > 1) {
|
||
fanIn.set(node.id, { ctx: { ...ctxWithMagic }, remaining: inDeg });
|
||
}
|
||
}
|
||
|
||
// 並行執行所有起點
|
||
const results = await Promise.all(
|
||
startNodes.map(node =>
|
||
this.executeNode(node, graph, ctxWithMagic, new Set(), trace, fanIn, kvStore)
|
||
)
|
||
);
|
||
|
||
// 合併所有起點的輸出
|
||
// 注意:若結果是 string(如 HTML),不可直接展開 — 展開 string 會產生字元索引物件
|
||
let mergedResult: unknown;
|
||
if (results.length === 1) {
|
||
mergedResult = results[0];
|
||
} else {
|
||
mergedResult = results.reduce(
|
||
(acc: Record<string, unknown>, r: unknown) => ({
|
||
...acc,
|
||
...(typeof r === 'object' && r !== null ? (r as Record<string, unknown>) : {}),
|
||
}),
|
||
{} as Record<string, unknown>
|
||
);
|
||
}
|
||
|
||
return { data: mergedResult, trace };
|
||
}
|
||
|
||
/**
|
||
* 從 paused state 繼續執行 workflow
|
||
* SDD: resumable-workflow/design.md §3.2
|
||
*
|
||
* 流程:
|
||
* 1. 把 paused_node 當已執行(result = callbackResult,注入進 context)
|
||
* 2. 找出 paused_node 的所有下游節點當新起點
|
||
* 3. 執行下游節點直到結束(或再次 paused)
|
||
*/
|
||
async resumeFromPaused(args: {
|
||
graph: ExecutionGraph;
|
||
paused_node_id: string;
|
||
paused_context: Record<string, unknown>; // paused 當下的 context
|
||
callback_result: Record<string, unknown>; // daemon callback 給的 result(取代 paused result)
|
||
prior_trace: TraceStep[];
|
||
kvNamespace?: KVNamespace;
|
||
recipe_output_format?: 'text' | 'json';
|
||
recipe_output_required_fields?: string[];
|
||
}): Promise<{ data: unknown; trace: TraceStep[] }> {
|
||
const { graph, paused_node_id, paused_context, prior_trace, kvNamespace } = args;
|
||
let { callback_result } = args;
|
||
|
||
// Recipe output parsing:跟立刻回路徑同樣解析(spread parsed 欄位到 top-level)
|
||
// SDD: recipe-system + resumable-workflow
|
||
callback_result = parseRecipeOutput(
|
||
callback_result,
|
||
args.recipe_output_format,
|
||
args.recipe_output_required_fields,
|
||
) as Record<string, unknown>;
|
||
|
||
this.currentGraph = graph;
|
||
this.currentRunId = `${graph.id}-resume-${Date.now()}`;
|
||
|
||
const trace: TraceStep[] = [...prior_trace];
|
||
const kvStore: KVContextStore | undefined = kvNamespace
|
||
? { runId: this.currentRunId, kv: kvNamespace }
|
||
: undefined;
|
||
|
||
// 把 callback_result 寫進 paused_node 的 KV output(讓下游讀得到)
|
||
if (kvStore) {
|
||
await kvSetNodeOutput(kvStore, paused_node_id, callback_result);
|
||
}
|
||
|
||
// 把 callback_result spread 進 context(替代 paused 結果)+ node-id namespace
|
||
// 2026-05-14 補:以前漏 namespace,導致下游 `{{paused_node_id.data.text}}` 模板抓不到,
|
||
// 必須跟同步路徑(propagateCtx)行為一致。
|
||
const mergedContext: Record<string, unknown> = {
|
||
...paused_context,
|
||
...(callback_result && typeof callback_result === 'object' ? callback_result : {}),
|
||
[paused_node_id]: callback_result,
|
||
};
|
||
if (kvStore) {
|
||
if (!mergedContext._kv_outputs) mergedContext._kv_outputs = {};
|
||
(mergedContext._kv_outputs as Record<string, unknown>)[paused_node_id] = callback_result;
|
||
}
|
||
|
||
// 找下游節點
|
||
const downstreamEdges = graph.edges.filter(e => e.from === paused_node_id);
|
||
if (downstreamEdges.length === 0) {
|
||
// paused_node 是最後一個節點 → 直接結束
|
||
return { data: callback_result, trace };
|
||
}
|
||
|
||
// 重建 fanIn(針對下游可能 fan-in 的節點)
|
||
const fanIn: FanInState = new Map();
|
||
for (const node of graph.nodes) {
|
||
const inDeg = graph.edges.filter(e => e.to === node.id).length;
|
||
if (inDeg > 1) {
|
||
fanIn.set(node.id, { ctx: { ...mergedContext }, remaining: inDeg });
|
||
}
|
||
}
|
||
|
||
// 對每個下游節點,建立新 visited Set 避免 paused_node 自己被再跑一次
|
||
const visited = new Set<string>([`${paused_node_id}:${JSON.stringify(paused_context).slice(0, 50)}`]);
|
||
|
||
const downstreamNodes = downstreamEdges
|
||
.map(e => graph.nodes.find(n => n.id === e.to))
|
||
.filter((n): n is GraphNode => !!n);
|
||
|
||
const results = await Promise.all(
|
||
downstreamNodes.map(node =>
|
||
this.executeNode(node, graph, mergedContext, visited, trace, fanIn, kvStore)
|
||
)
|
||
);
|
||
|
||
let mergedResult: unknown;
|
||
if (results.length === 1) {
|
||
mergedResult = results[0];
|
||
} else {
|
||
mergedResult = results.reduce(
|
||
(acc: Record<string, unknown>, r: unknown) => ({
|
||
...acc,
|
||
...(typeof r === 'object' && r !== null ? (r as Record<string, unknown>) : {}),
|
||
}),
|
||
{} as Record<string, unknown>,
|
||
);
|
||
}
|
||
return { data: mergedResult, trace };
|
||
}
|
||
|
||
private async executeNode(
|
||
node: GraphNode,
|
||
graph: ExecutionGraph,
|
||
context: unknown,
|
||
visited: Set<string>,
|
||
trace: TraceStep[],
|
||
fanIn: FanInState,
|
||
kvStore?: KVContextStore,
|
||
): Promise<unknown> {
|
||
const nodeKey = `${node.id}:${JSON.stringify(context).slice(0, 50)}`;
|
||
if (visited.has(nodeKey)) return context;
|
||
visited.add(nodeKey);
|
||
|
||
const start = Date.now();
|
||
let result: unknown = context;
|
||
let nodeInput: unknown = context;
|
||
|
||
try {
|
||
switch (node.type) {
|
||
case 'Input':
|
||
result = node.data ?? context;
|
||
nodeInput = result;
|
||
break;
|
||
|
||
case 'Component': {
|
||
if (!node.componentId) throw new Error(`節點 ${node.id} 缺少 componentId`);
|
||
const runner = await this.loader(node.componentId);
|
||
const ctx = context as Record<string, unknown>;
|
||
// node.data 的 string 值支援 {{variable}} 替換(從 context 取值)
|
||
const resolvedData = interpolateData(node.data, ctx);
|
||
// 優先順序:node.data(靜態參數,如 pattern/sheet)> context(全局參數)
|
||
let mergedContext: Record<string, unknown> = {
|
||
...ctx,
|
||
...resolvedData,
|
||
};
|
||
|
||
// 用戶面 {{credential.NAME}} 展開(design §8):偵測 node.data 裡用戶寫的
|
||
// {{credential.X}} → 交 auth_static_key WASM resolve_credentials 解密回填。
|
||
// 解密在 WASM(rule 02 §2.2),此處只偵測+回填,不碰 ENCRYPTION_KEY。
|
||
if (this.env && this.apiKey) {
|
||
mergedContext = await resolveCredentialRefs(mergedContext, this.env, this.apiKey);
|
||
}
|
||
|
||
// Resumable workflow callback_url 注入(SDD: resumable-workflow/design.md §2.2)
|
||
// claude_api 容器拿到後會透傳給 Mira daemon,daemon task 完成時 POST 進來
|
||
// hostname 暫從 PUBLIC_BASE_URL 取,沒設則用 cypher.arcrun.dev 預設
|
||
if (node.componentId === 'claude_api') {
|
||
const baseUrl = (this.env as { PUBLIC_BASE_URL?: string } | undefined)?.PUBLIC_BASE_URL
|
||
?? 'https://cypher.arcrun.dev';
|
||
mergedContext.callback_url = `${baseUrl.replace(/\/$/, '')}/workflows/resume`;
|
||
}
|
||
|
||
// Recipe expansion:若 node.data.recipe 存在,展開成實際 prompt 並併進 mergedContext
|
||
// SDD: matrix/arcrun/.agents/specs/recipe-system/design.md §2.2
|
||
if (typeof resolvedData.recipe === 'string' && this.env?.RECIPES) {
|
||
try {
|
||
const expanded = await expandPromptRecipe(
|
||
resolvedData.recipe,
|
||
ctx,
|
||
this.env as { RECIPES: { get: (k: string) => Promise<string | null> }; KBDB_BASE_URL?: string },
|
||
this.apiKey ?? '',
|
||
);
|
||
mergedContext = {
|
||
...mergedContext,
|
||
prompt: expanded.prompt,
|
||
model: expanded.model,
|
||
_recipe_output_format: expanded.output_format,
|
||
_recipe_output_required_fields: expanded.output_required_fields,
|
||
};
|
||
} catch (e) {
|
||
throw new Error(`recipe 展開失敗 (${resolvedData.recipe}): ${e instanceof Error ? e.message : String(e)}`);
|
||
}
|
||
}
|
||
|
||
// Credential 注入:在 WASM 執行前自動注入 credentials_required 中宣告的 token
|
||
if (this.env) {
|
||
// 先試 auth dispatcher(新路徑,走 auth primitive WASM Worker via HTTP)
|
||
// 命中才 return;否則 fallback 到舊 injectCredentials(Phase 1.9 會刪除)
|
||
if (this.apiKey) {
|
||
const dispatched = await tryAuthDispatch(node.componentId, mergedContext, this.env, this.apiKey);
|
||
if (dispatched) {
|
||
mergedContext = dispatched;
|
||
} else {
|
||
mergedContext = await injectCredentials(node.componentId, mergedContext, this.env, this.apiKey);
|
||
}
|
||
} else {
|
||
mergedContext = await injectCredentials(node.componentId, mergedContext, this.env, this.apiKey);
|
||
}
|
||
}
|
||
|
||
// kbdb-base §7.5.h:收集本次用到的 recipe **uuid**(執行結束後一次性記到 KBDB 市場星數)。
|
||
// 記 per-uuid(非 auth service):投稿/pull 的是 API recipe(帶 uuid),市場數據要能區分
|
||
// 同 canonical 的 Leo 版/John 版(§7.5.5 app-store)。先試 API recipe(有 uuid);
|
||
// 無 uuid 的舊資料 fallback canonical_id(向後相容,migration 後自然帶 uuid)。
|
||
if (this.env?.RECIPES) {
|
||
try {
|
||
const apiRecipe = await resolveRecipe(node.componentId, this.env.RECIPES);
|
||
if (apiRecipe) this.usedRecipeKeys.add(apiRecipe.uuid ?? apiRecipe.canonical_id);
|
||
} catch {
|
||
// 收集失敗不影響執行(成功記錄是輔助資料,非主流程)
|
||
}
|
||
}
|
||
|
||
nodeInput = mergedContext;
|
||
result = await runner(mergedContext);
|
||
|
||
// Resumable workflow:偵測 pending,持久化 paused state 後 throw WorkflowPaused
|
||
// SDD: resumable-workflow/design.md §3.2.1
|
||
// 注意:放在 recipe output parsing 之前 — pending 結果不該被當 JSON 解析
|
||
const pending = isResumablePending(result);
|
||
if (pending && this.env?.EXEC_CONTEXT && this.currentGraph && this.currentRunId) {
|
||
// 把這個節點的執行紀錄寫進 trace(status=paused)
|
||
trace.push({
|
||
nodeId: node.id,
|
||
type: node.type,
|
||
input: nodeInput,
|
||
output: result,
|
||
duration_ms: Date.now() - start,
|
||
});
|
||
await persistPausedRun(this.env.EXEC_CONTEXT, pending.task_id, {
|
||
run_id: this.currentRunId,
|
||
graph: this.currentGraph,
|
||
paused_node_id: node.id,
|
||
paused_context: context as Record<string, unknown>,
|
||
paused_pending_result: result as Record<string, unknown>,
|
||
trace_so_far: trace,
|
||
api_key: this.apiKey,
|
||
expires_at: Date.now() + 24 * 60 * 60 * 1000,
|
||
recipe_output_format: mergedContext._recipe_output_format as 'text' | 'json' | undefined,
|
||
recipe_output_required_fields: mergedContext._recipe_output_required_fields as string[] | undefined,
|
||
});
|
||
throw new WorkflowPaused(pending.task_id, this.currentRunId, node.id, trace);
|
||
}
|
||
|
||
// Recipe output parsing:用 parseRecipeOutput 統一處理(立刻回 + resume 長回兩條路共用)
|
||
// SDD: recipe-system + resumable-workflow
|
||
// 解完後 parsed JSON 的 top-level 欄位(如 paragraphs / triplets)spread 到 result,
|
||
// 讓下游 FOREACH 跟 {{var}} 模板直接可取
|
||
result = parseRecipeOutput(
|
||
result,
|
||
mergedContext._recipe_output_format as 'text' | 'json' | undefined,
|
||
mergedContext._recipe_output_required_fields as string[] | undefined,
|
||
);
|
||
|
||
// BUILD-006:將節點 output 存入 KV(key = {run_id}:node:{node_id})
|
||
// 這讓下游節點可以透過 KV 讀取上游的具名 output,解決同名欄位衝突
|
||
if (kvStore && result !== null && result !== undefined) {
|
||
await kvSetNodeOutput(kvStore, node.id, result);
|
||
}
|
||
|
||
// Phase 2:記錄 component 被引用(追蹤生命週期)
|
||
// 由 component-registry 追蹤使用狀態,決定是否保留
|
||
// 在後台執行,不阻擋主流程
|
||
void this.recordComponentReference?.(node.componentId, graph.id).catch(() => {
|
||
// 記錄失敗不應該中止執行
|
||
});
|
||
|
||
break;
|
||
}
|
||
|
||
case 'Output':
|
||
result = context;
|
||
break;
|
||
}
|
||
} catch (e: any) {
|
||
// WorkflowPaused 不是錯誤,是「workflow 暫停」訊號,直接往上傳
|
||
// SDD: resumable-workflow/design.md
|
||
if (e instanceof WorkflowPaused) throw e;
|
||
|
||
const errMsg = e.message || String(e);
|
||
const duration_ms = Date.now() - start;
|
||
trace.push({
|
||
nodeId: node.id,
|
||
type: node.type,
|
||
input: nodeInput,
|
||
output: null,
|
||
error: errMsg,
|
||
duration_ms,
|
||
});
|
||
// Step-level telemetry:node 失敗事件(LI SDD M2.x 自評建議)
|
||
if (this.env && node.type === 'Component') {
|
||
recordTelemetry(this.env, this.apiKey, {
|
||
event_type: 'node_failure',
|
||
workflow_name: graph.name,
|
||
component_id: node.componentId,
|
||
error_code: 'node_error',
|
||
duration_ms,
|
||
});
|
||
}
|
||
// 若已是 ExecutionError(上游節點拋出),保留原始 trace 繼續往上傳
|
||
if (e instanceof ExecutionError) throw e;
|
||
throw new ExecutionError(
|
||
`Node ${node.id} failed: ${errMsg}`,
|
||
node.id,
|
||
nodeInput,
|
||
trace,
|
||
);
|
||
}
|
||
|
||
const duration_ms = Date.now() - start;
|
||
trace.push({
|
||
nodeId: node.id,
|
||
type: node.type,
|
||
input: nodeInput,
|
||
output: result,
|
||
duration_ms,
|
||
});
|
||
|
||
// Step-level telemetry:node 成功事件(只記 Component,Input/Output 跳過)
|
||
// LI SDD M2.x:給 weekly_review 提的「效能基準線」建議用 — 每個 node duration 都可追
|
||
if (this.env && node.type === 'Component') {
|
||
recordTelemetry(this.env, this.apiKey, {
|
||
event_type: 'node_success',
|
||
workflow_name: graph.name,
|
||
component_id: node.componentId,
|
||
duration_ms,
|
||
});
|
||
}
|
||
|
||
// 處理出邊
|
||
const outEdges = graph.edges.filter(e => e.from === node.id);
|
||
|
||
for (const edge of outEdges) {
|
||
const nextNode = graph.nodes.find(n => n.id === edge.to);
|
||
if (!nextNode) continue;
|
||
|
||
switch (edge.type as EdgeType) {
|
||
case 'PIPE': {
|
||
const pipeContext: Record<string, unknown> = propagateCtx(context, result, node.id);
|
||
|
||
if (kvStore) {
|
||
const kvOutput = await kvGetNodeOutput(kvStore, node.id);
|
||
if (kvOutput !== undefined) {
|
||
if (!pipeContext._kv_outputs) pipeContext._kv_outputs = {};
|
||
(pipeContext._kv_outputs as Record<string, unknown>)[node.id] = kvOutput;
|
||
}
|
||
}
|
||
|
||
const fanInState = fanIn.get(nextNode.id);
|
||
if (fanInState) {
|
||
Object.assign(fanInState.ctx, pipeContext);
|
||
fanInState.remaining--;
|
||
if (fanInState.remaining === 0) {
|
||
result = await this.executeNode(nextNode, graph, fanInState.ctx, visited, trace, fanIn, kvStore);
|
||
}
|
||
} else {
|
||
result = await this.executeNode(nextNode, graph, pipeContext, visited, trace, fanIn, kvStore);
|
||
}
|
||
break;
|
||
}
|
||
|
||
case 'ON_SUCCESS': {
|
||
if (!isFailure(result)) {
|
||
const mergedCtx = propagateCtx(context, result, node.id);
|
||
result = await this.executeNode(nextNode, graph, mergedCtx, visited, trace, fanIn, kvStore);
|
||
}
|
||
break;
|
||
}
|
||
|
||
case 'ON_FAIL': {
|
||
if (isFailure(result)) {
|
||
const mergedCtx = propagateCtx(context, result, node.id);
|
||
result = await this.executeNode(nextNode, graph, mergedCtx, visited, trace, fanIn, kvStore);
|
||
}
|
||
break;
|
||
}
|
||
|
||
case 'IF': {
|
||
const passes = evaluateCondition(edge.condition ?? 'true', result);
|
||
if (passes) {
|
||
const mergedCtx = propagateCtx(context, result, node.id);
|
||
result = await this.executeNode(nextNode, graph, mergedCtx, visited, trace, fanIn, kvStore);
|
||
}
|
||
break;
|
||
}
|
||
|
||
case 'FOREACH': {
|
||
const iteratorKey = edge.iterator ?? 'item';
|
||
// 找 iterable 順序:先看上游 output (result),沒有再看完整 context (含上游 chain 累積的 fields)
|
||
// 2026-05-13:原本只看 result,但 result 是當前節點 output (如 create_wiki_page 只回 {data, success})
|
||
// 不含更上游節點給的 paragraphs。propagateCtx 已把 paragraphs spread 進 ctx,FOREACH 該能取到
|
||
let items = getIterableFromContext(result, iteratorKey);
|
||
if (items.length === 0) {
|
||
items = getIterableFromContext(context, iteratorKey);
|
||
}
|
||
const iterResults: unknown[] = [];
|
||
|
||
// FOREACH itemContext 順序:propagateCtx + 加 iterator key
|
||
const baseForeachCtx = propagateCtx(context, result, node.id);
|
||
for (const item of items) {
|
||
const itemContext = {
|
||
...baseForeachCtx,
|
||
[iteratorKey]: item,
|
||
};
|
||
const itemResult = await this.executeNode(nextNode, graph, itemContext, new Set(), trace, fanIn, kvStore);
|
||
iterResults.push(itemResult);
|
||
}
|
||
|
||
result = { ...(result as Record<string, unknown>), results: iterResults };
|
||
break;
|
||
}
|
||
|
||
case 'CALLS_SUBFLOW': {
|
||
// 從 workflowLoader 載入子 Workflow,以當前 context 執行,輸出合併回主流程
|
||
const subWorkflowId = nextNode.componentId?.replace('workflow://', '') ?? nextNode.id;
|
||
if (this.workflowLoader) {
|
||
const subGraph = await this.workflowLoader(subWorkflowId);
|
||
const subExecutor = new GraphExecutor(this.loader, this.workflowLoader);
|
||
const subResult = await subExecutor.execute(
|
||
subGraph,
|
||
result as Record<string, unknown>,
|
||
kvStore?.kv,
|
||
);
|
||
result = {
|
||
...(result as Record<string, unknown>),
|
||
...(subResult.data as Record<string, unknown>),
|
||
};
|
||
}
|
||
break;
|
||
}
|
||
|
||
case 'ON_CLICK': {
|
||
const mergedCtx = propagateCtx(context, result, node.id);
|
||
result = await this.executeNode(nextNode, graph, mergedCtx, visited, trace, fanIn, kvStore);
|
||
break;
|
||
}
|
||
|
||
case 'IS_A': {
|
||
// 節點類型宣告:記錄 componentId,不執行
|
||
// IS_A 邊的 to 是零件 URI(如 component://validate_json)
|
||
// 這個資訊已在 graph-builder 階段處理,執行時不需要額外動作
|
||
break;
|
||
}
|
||
|
||
case 'CONTAINS':
|
||
case 'HAS_STYLE':
|
||
case 'HAS_BEHAVIOR': {
|
||
// 結構語意:只記錄圖結構,不執行
|
||
break;
|
||
}
|
||
|
||
case 'CONTINUE':
|
||
break;
|
||
}
|
||
}
|
||
|
||
return result;
|
||
}
|
||
}
|
||
|
||
/** 給下游節點組 ctx:merge 原 context + 上游 output (spread) + 上游 output 用 node id namespace
|
||
* 讓下游能用:
|
||
* {{api_key}}(global,從 baseCtx)
|
||
* {{data.text}}(上一節點 output spread 進來,會被下下個節點覆蓋)
|
||
* {{classify.data.text}}(指名某節點 output,永不被覆蓋因 node id 唯一)
|
||
* 優先順位:baseCtx(含先前 node namespace)< 上游 output spread < 當前 node namespace
|
||
*/
|
||
function propagateCtx(
|
||
context: unknown,
|
||
upstreamResult: unknown,
|
||
upstreamNodeId: string,
|
||
): Record<string, unknown> {
|
||
const baseCtx = (typeof context === 'object' && context !== null) ? context as Record<string, unknown> : {};
|
||
const baseResult = (typeof upstreamResult === 'object' && upstreamResult !== null) ? upstreamResult as Record<string, unknown> : {};
|
||
return {
|
||
...baseCtx,
|
||
...baseResult,
|
||
[upstreamNodeId]: upstreamResult,
|
||
};
|
||
}
|
||
|
||
/** node.data 內所有 string 值(含 nested object / array)支援 {{variable}} 替換
|
||
* 支援嵌套 path:{{item.content}} → ctx.item.content
|
||
* 支援 array index:{{paragraphs.0.entity}} → ctx.paragraphs[0].entity
|
||
* 非 string 值(object/array)遞迴展開內部 string;undefined / null / number / bool 保留原值
|
||
* 2026-05-13 加遞迴:原本只跑 top-level,set 零件 values 嵌套 / 任何零件 body 內含 {{x.y}} 用不了。
|
||
* 2026-05-14 加 single-ref pass-through:若整個 string 是 `{{x}}` 且 x 是 array / object,
|
||
* 回 raw value 不 stringify(讓 filter `items: "{{list.blocks}}"` 能拿到真陣列)。
|
||
* 多 ref 或混合文字仍 stringify 為字串。
|
||
*/
|
||
function interpolateString(s: string, ctx: Record<string, unknown>): unknown {
|
||
// 整個值是單一 {{x}} 引用 → 回 raw value(保留 array / object 型別)
|
||
const single = s.match(/^\s*\{\{([\w.]+)\}\}\s*$/);
|
||
if (single) {
|
||
const val = getNestedValue(ctx, single[1]);
|
||
return val === undefined ? s : val;
|
||
}
|
||
// 多 ref / 混合文字 → 一律拼成 string
|
||
return s.replace(/\{\{([\w.]+)\}\}/g, (_, key: string) => {
|
||
const val = getNestedValue(ctx, key);
|
||
if (val === undefined) return `{{${key}}}`;
|
||
if (typeof val === 'string') return val;
|
||
return JSON.stringify(val);
|
||
});
|
||
}
|
||
|
||
function interpolateValue(v: unknown, ctx: Record<string, unknown>): unknown {
|
||
if (typeof v === 'string') return interpolateString(v, ctx);
|
||
if (Array.isArray(v)) return v.map(item => interpolateValue(item, ctx));
|
||
if (v !== null && typeof v === 'object') {
|
||
const result: Record<string, unknown> = {};
|
||
for (const [k, val] of Object.entries(v as Record<string, unknown>)) {
|
||
result[k] = interpolateValue(val, ctx);
|
||
}
|
||
return result;
|
||
}
|
||
return v;
|
||
}
|
||
|
||
function interpolateData(
|
||
data: Record<string, unknown> | undefined,
|
||
ctx: Record<string, unknown>,
|
||
): Record<string, unknown> {
|
||
if (!data) return {};
|
||
return interpolateValue(data, ctx) as Record<string, unknown>;
|
||
}
|
||
|
||
/** 從 ctx 用 dot path 取嵌套值:'a.b.0.c' → ctx.a.b[0].c */
|
||
function getNestedValue(ctx: unknown, path: string): unknown {
|
||
const parts = path.split('.');
|
||
let cur: unknown = ctx;
|
||
for (const p of parts) {
|
||
if (cur === null || cur === undefined) return undefined;
|
||
if (typeof cur !== 'object') return undefined;
|
||
cur = (cur as Record<string, unknown>)[p];
|
||
}
|
||
return cur;
|
||
}
|
||
|
||
/** 判斷節點執行結果是否為失敗:success === false 或含有 error key */
|
||
function isFailure(result: unknown): boolean {
|
||
if (!result || typeof result !== 'object') return false;
|
||
const r = result as Record<string, unknown>;
|
||
return r['success'] === false || 'error' in r;
|
||
}
|
||
|
||
/**
|
||
* 安全條件評估(不使用 new Function)
|
||
* 支援格式:ctx.key === value, ctx.key > value, ctx.key(truthy)
|
||
*/
|
||
function evaluateCondition(condition: string, context: unknown): boolean {
|
||
if (!context || typeof context !== 'object') return false;
|
||
const ctx = context as Record<string, unknown>;
|
||
|
||
// 正規化:把 result. 替換為空(直接存取 key)
|
||
const expr = condition.replace(/result\./g, '').replace(/ctx\./g, '');
|
||
|
||
// 簡單 === 比較
|
||
const eqMatch = expr.match(/^(\w+)\s*===?\s*(.+)$/);
|
||
if (eqMatch) {
|
||
const key = eqMatch[1];
|
||
const rawVal = eqMatch[2].trim();
|
||
const expected = rawVal === 'true' ? true : rawVal === 'false' ? false : rawVal.replace(/['"]/g, '');
|
||
return ctx[key] === expected;
|
||
}
|
||
|
||
// 簡單 > 比較
|
||
const gtMatch = expr.match(/^(\w+)\s*>\s*(\d+)$/);
|
||
if (gtMatch) {
|
||
return Number(ctx[gtMatch[1]]) > Number(gtMatch[2]);
|
||
}
|
||
|
||
// truthy check
|
||
const key = expr.trim();
|
||
if (key && key in ctx) return !!ctx[key];
|
||
|
||
return true;
|
||
}
|
||
|
||
function getIterableFromContext(context: unknown, key: string): unknown[] {
|
||
if (!context || typeof context !== 'object') return [];
|
||
// 多種 plural 變體:entity → entities / paragraph → paragraphs / box → boxes / 等
|
||
// 2026-05-17:原本只試 key+'s','entity+s=entitys' ≠ 'entities' 無法命中,加 irregular
|
||
const variants = [
|
||
key + 's', // paragraph → paragraphs
|
||
key.replace(/y$/, 'ies'), // entity → entities
|
||
key.replace(/(s|x|z|ch|sh)$/, '$1es'), // box → boxes
|
||
key, // singular fallback
|
||
];
|
||
const obj = context as Record<string, unknown>;
|
||
|
||
// 先看 top-level(最常見)
|
||
for (const v of variants) {
|
||
if (Array.isArray(obj[v])) return obj[v] as unknown[];
|
||
}
|
||
|
||
// 若找不到,掃一層內部 object 看 nested(巢狀 FOREACH 場景:
|
||
// 外層 FOREACH 把 paragraph 注入 ctx,內層 FOREACH 要找 paragraph.triplets)
|
||
for (const val of Object.values(obj)) {
|
||
if (val !== null && typeof val === 'object' && !Array.isArray(val)) {
|
||
for (const v of variants) {
|
||
const nested = (val as Record<string, unknown>)[v];
|
||
if (Array.isArray(nested)) return nested;
|
||
}
|
||
}
|
||
}
|
||
return [];
|
||
}
|