arcrun — AI workflow execution engine (clean history)
Self-hosted 開源:WASM 零件 + recipe + cypher-executor,跑在你自己的 Cloudflare。 此為重建的乾淨歷史起點(移除曾誤 commit 的 GCP SA 金鑰,舊歷史保留在 richblack/arcrun 與本地 backup 分支)。含: - acr init --self-hosted installer(建 KV/R2 + codeload 拉預編譯 wasm + wrangler deploy + seed recipe) - recipe push 把關(資料外流提醒 + 打通檢查) - 19 個正當零件預編譯 wasm(claude_api/km_writer/kbdb_upsert_block 排除:違反 DECISIONS §1) - CLI / cypher-executor / registry / 完整 SDD Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,683 @@
|
||||
// arcrun 圖遍歷引擎 — 支援完整 Cypher 語意關係
|
||||
import type { ExecutionGraph, GraphNode, TraceStep, ComponentRunner, KVContextStore, EdgeType, Bindings } from './types';
|
||||
import { kvSetNodeOutput, kvGetNodeOutput, ExecutionError, WorkflowPaused } from './types';
|
||||
import { injectCredentials } from './actions/credential-injector';
|
||||
import { tryAuthDispatch } from './actions/auth-dispatcher';
|
||||
import { expandPromptRecipe } from './lib/recipe-expander';
|
||||
import { persistPausedRun, isResumablePending, parseRecipeOutput } from './lib/paused-runs';
|
||||
import { buildMagicVars } from './lib/magic-vars';
|
||||
import { recordTelemetry } from './lib/telemetry';
|
||||
|
||||
export type ComponentLoader = (componentId: string) => Promise<ComponentRunner>;
|
||||
export type WorkflowLoader = (workflowId: string) => Promise<ExecutionGraph>;
|
||||
|
||||
// Fan-in 狀態:入度 > 1 的節點需要等所有上游完成後才執行
|
||||
type FanInState = Map<string, { ctx: Record<string, unknown>; remaining: number }>;
|
||||
|
||||
export class GraphExecutor {
|
||||
private loader: ComponentLoader;
|
||||
private workflowLoader?: WorkflowLoader;
|
||||
private env?: Bindings;
|
||||
private apiKey?: string;
|
||||
public recordComponentReference?: (componentId: string, workflowId: string) => Promise<void>;
|
||||
|
||||
// resumable workflow(SDD: resumable-workflow/design.md)
|
||||
// 暫停時持久化 state 用,需在 execute 進入時設定
|
||||
private currentGraph?: ExecutionGraph;
|
||||
private currentRunId?: string;
|
||||
|
||||
constructor(loader: ComponentLoader, workflowLoader?: WorkflowLoader, env?: Bindings, apiKey?: string) {
|
||||
this.loader = loader;
|
||||
this.workflowLoader = workflowLoader;
|
||||
this.env = env;
|
||||
this.apiKey = apiKey;
|
||||
}
|
||||
|
||||
async execute(
|
||||
graph: ExecutionGraph,
|
||||
initialContext: Record<string, unknown>,
|
||||
kvNamespace?: KVNamespace | undefined,
|
||||
): Promise<{
|
||||
data: unknown;
|
||||
trace: TraceStep[];
|
||||
}> {
|
||||
const trace: TraceStep[] = [];
|
||||
|
||||
// 建立 KV Context Store(BUILD-006)
|
||||
// run_id = graphId + timestamp,確保每次執行獨立
|
||||
const kvStore: KVContextStore | undefined = kvNamespace
|
||||
? { runId: `${graph.id}-${Date.now()}`, kv: kvNamespace }
|
||||
: undefined;
|
||||
|
||||
// resumable workflow:記住當前 graph + run_id 給 pending 暫停用
|
||||
this.currentGraph = graph;
|
||||
this.currentRunId = kvStore?.runId ?? `${graph.id}-${Date.now()}`;
|
||||
|
||||
// Magic vars:注入 _today / _now / _iso_week 等系統變數(LI SDD M2.x)
|
||||
// initialContext 寫前,magic vars 寫後 → magic vars 永遠 win(防 user accidentally 用 _ prefix)
|
||||
// 同時保留 user 既有 ctx,magic vars 不破壞既有 workflow(_ prefix reserved)
|
||||
const ctxWithMagic: Record<string, unknown> = {
|
||||
...initialContext,
|
||||
...buildMagicVars(),
|
||||
};
|
||||
|
||||
// 找出所有起點(沒有任何邊指向的節點)
|
||||
const hasIncoming = new Set(graph.edges.map(e => e.to));
|
||||
const startNodes = graph.nodes.filter(n => !hasIncoming.has(n.id));
|
||||
|
||||
if (startNodes.length === 0) {
|
||||
return { data: ctxWithMagic, trace };
|
||||
}
|
||||
|
||||
// 建立 fan-in 狀態(入度 > 1 的節點需要等所有上游)
|
||||
const fanIn: FanInState = new Map();
|
||||
for (const node of graph.nodes) {
|
||||
const inDeg = graph.edges.filter(e => e.to === node.id).length;
|
||||
if (inDeg > 1) {
|
||||
fanIn.set(node.id, { ctx: { ...ctxWithMagic }, remaining: inDeg });
|
||||
}
|
||||
}
|
||||
|
||||
// 並行執行所有起點
|
||||
const results = await Promise.all(
|
||||
startNodes.map(node =>
|
||||
this.executeNode(node, graph, ctxWithMagic, new Set(), trace, fanIn, kvStore)
|
||||
)
|
||||
);
|
||||
|
||||
// 合併所有起點的輸出
|
||||
// 注意:若結果是 string(如 HTML),不可直接展開 — 展開 string 會產生字元索引物件
|
||||
let mergedResult: unknown;
|
||||
if (results.length === 1) {
|
||||
mergedResult = results[0];
|
||||
} else {
|
||||
mergedResult = results.reduce(
|
||||
(acc: Record<string, unknown>, r: unknown) => ({
|
||||
...acc,
|
||||
...(typeof r === 'object' && r !== null ? (r as Record<string, unknown>) : {}),
|
||||
}),
|
||||
{} as Record<string, unknown>
|
||||
);
|
||||
}
|
||||
|
||||
return { data: mergedResult, trace };
|
||||
}
|
||||
|
||||
/**
|
||||
* 從 paused state 繼續執行 workflow
|
||||
* SDD: resumable-workflow/design.md §3.2
|
||||
*
|
||||
* 流程:
|
||||
* 1. 把 paused_node 當已執行(result = callbackResult,注入進 context)
|
||||
* 2. 找出 paused_node 的所有下游節點當新起點
|
||||
* 3. 執行下游節點直到結束(或再次 paused)
|
||||
*/
|
||||
async resumeFromPaused(args: {
|
||||
graph: ExecutionGraph;
|
||||
paused_node_id: string;
|
||||
paused_context: Record<string, unknown>; // paused 當下的 context
|
||||
callback_result: Record<string, unknown>; // daemon callback 給的 result(取代 paused result)
|
||||
prior_trace: TraceStep[];
|
||||
kvNamespace?: KVNamespace;
|
||||
recipe_output_format?: 'text' | 'json';
|
||||
recipe_output_required_fields?: string[];
|
||||
}): Promise<{ data: unknown; trace: TraceStep[] }> {
|
||||
const { graph, paused_node_id, paused_context, prior_trace, kvNamespace } = args;
|
||||
let { callback_result } = args;
|
||||
|
||||
// Recipe output parsing:跟立刻回路徑同樣解析(spread parsed 欄位到 top-level)
|
||||
// SDD: recipe-system + resumable-workflow
|
||||
callback_result = parseRecipeOutput(
|
||||
callback_result,
|
||||
args.recipe_output_format,
|
||||
args.recipe_output_required_fields,
|
||||
) as Record<string, unknown>;
|
||||
|
||||
this.currentGraph = graph;
|
||||
this.currentRunId = `${graph.id}-resume-${Date.now()}`;
|
||||
|
||||
const trace: TraceStep[] = [...prior_trace];
|
||||
const kvStore: KVContextStore | undefined = kvNamespace
|
||||
? { runId: this.currentRunId, kv: kvNamespace }
|
||||
: undefined;
|
||||
|
||||
// 把 callback_result 寫進 paused_node 的 KV output(讓下游讀得到)
|
||||
if (kvStore) {
|
||||
await kvSetNodeOutput(kvStore, paused_node_id, callback_result);
|
||||
}
|
||||
|
||||
// 把 callback_result spread 進 context(替代 paused 結果)+ node-id namespace
|
||||
// 2026-05-14 補:以前漏 namespace,導致下游 `{{paused_node_id.data.text}}` 模板抓不到,
|
||||
// 必須跟同步路徑(propagateCtx)行為一致。
|
||||
const mergedContext: Record<string, unknown> = {
|
||||
...paused_context,
|
||||
...(callback_result && typeof callback_result === 'object' ? callback_result : {}),
|
||||
[paused_node_id]: callback_result,
|
||||
};
|
||||
if (kvStore) {
|
||||
if (!mergedContext._kv_outputs) mergedContext._kv_outputs = {};
|
||||
(mergedContext._kv_outputs as Record<string, unknown>)[paused_node_id] = callback_result;
|
||||
}
|
||||
|
||||
// 找下游節點
|
||||
const downstreamEdges = graph.edges.filter(e => e.from === paused_node_id);
|
||||
if (downstreamEdges.length === 0) {
|
||||
// paused_node 是最後一個節點 → 直接結束
|
||||
return { data: callback_result, trace };
|
||||
}
|
||||
|
||||
// 重建 fanIn(針對下游可能 fan-in 的節點)
|
||||
const fanIn: FanInState = new Map();
|
||||
for (const node of graph.nodes) {
|
||||
const inDeg = graph.edges.filter(e => e.to === node.id).length;
|
||||
if (inDeg > 1) {
|
||||
fanIn.set(node.id, { ctx: { ...mergedContext }, remaining: inDeg });
|
||||
}
|
||||
}
|
||||
|
||||
// 對每個下游節點,建立新 visited Set 避免 paused_node 自己被再跑一次
|
||||
const visited = new Set<string>([`${paused_node_id}:${JSON.stringify(paused_context).slice(0, 50)}`]);
|
||||
|
||||
const downstreamNodes = downstreamEdges
|
||||
.map(e => graph.nodes.find(n => n.id === e.to))
|
||||
.filter((n): n is GraphNode => !!n);
|
||||
|
||||
const results = await Promise.all(
|
||||
downstreamNodes.map(node =>
|
||||
this.executeNode(node, graph, mergedContext, visited, trace, fanIn, kvStore)
|
||||
)
|
||||
);
|
||||
|
||||
let mergedResult: unknown;
|
||||
if (results.length === 1) {
|
||||
mergedResult = results[0];
|
||||
} else {
|
||||
mergedResult = results.reduce(
|
||||
(acc: Record<string, unknown>, r: unknown) => ({
|
||||
...acc,
|
||||
...(typeof r === 'object' && r !== null ? (r as Record<string, unknown>) : {}),
|
||||
}),
|
||||
{} as Record<string, unknown>,
|
||||
);
|
||||
}
|
||||
return { data: mergedResult, trace };
|
||||
}
|
||||
|
||||
private async executeNode(
|
||||
node: GraphNode,
|
||||
graph: ExecutionGraph,
|
||||
context: unknown,
|
||||
visited: Set<string>,
|
||||
trace: TraceStep[],
|
||||
fanIn: FanInState,
|
||||
kvStore?: KVContextStore,
|
||||
): Promise<unknown> {
|
||||
const nodeKey = `${node.id}:${JSON.stringify(context).slice(0, 50)}`;
|
||||
if (visited.has(nodeKey)) return context;
|
||||
visited.add(nodeKey);
|
||||
|
||||
const start = Date.now();
|
||||
let result: unknown = context;
|
||||
let nodeInput: unknown = context;
|
||||
|
||||
try {
|
||||
switch (node.type) {
|
||||
case 'Input':
|
||||
result = node.data ?? context;
|
||||
nodeInput = result;
|
||||
break;
|
||||
|
||||
case 'Component': {
|
||||
if (!node.componentId) throw new Error(`節點 ${node.id} 缺少 componentId`);
|
||||
const runner = await this.loader(node.componentId);
|
||||
const ctx = context as Record<string, unknown>;
|
||||
// node.data 的 string 值支援 {{variable}} 替換(從 context 取值)
|
||||
const resolvedData = interpolateData(node.data, ctx);
|
||||
// 優先順序:node.data(靜態參數,如 pattern/sheet)> context(全局參數)
|
||||
let mergedContext: Record<string, unknown> = {
|
||||
...ctx,
|
||||
...resolvedData,
|
||||
};
|
||||
|
||||
// Resumable workflow callback_url 注入(SDD: resumable-workflow/design.md §2.2)
|
||||
// claude_api 容器拿到後會透傳給 Mira daemon,daemon task 完成時 POST 進來
|
||||
// hostname 暫從 PUBLIC_BASE_URL 取,沒設則用 cypher.arcrun.dev 預設
|
||||
if (node.componentId === 'claude_api') {
|
||||
const baseUrl = (this.env as { PUBLIC_BASE_URL?: string } | undefined)?.PUBLIC_BASE_URL
|
||||
?? 'https://cypher.arcrun.dev';
|
||||
mergedContext.callback_url = `${baseUrl.replace(/\/$/, '')}/workflows/resume`;
|
||||
}
|
||||
|
||||
// Recipe expansion:若 node.data.recipe 存在,展開成實際 prompt 並併進 mergedContext
|
||||
// SDD: matrix/arcrun/.agents/specs/recipe-system/design.md §2.2
|
||||
if (typeof resolvedData.recipe === 'string' && this.env?.RECIPES) {
|
||||
try {
|
||||
const expanded = await expandPromptRecipe(
|
||||
resolvedData.recipe,
|
||||
ctx,
|
||||
this.env as { RECIPES: { get: (k: string) => Promise<string | null> }; KBDB_BASE_URL?: string },
|
||||
this.apiKey ?? '',
|
||||
);
|
||||
mergedContext = {
|
||||
...mergedContext,
|
||||
prompt: expanded.prompt,
|
||||
model: expanded.model,
|
||||
_recipe_output_format: expanded.output_format,
|
||||
_recipe_output_required_fields: expanded.output_required_fields,
|
||||
};
|
||||
} catch (e) {
|
||||
throw new Error(`recipe 展開失敗 (${resolvedData.recipe}): ${e instanceof Error ? e.message : String(e)}`);
|
||||
}
|
||||
}
|
||||
|
||||
// Credential 注入:在 WASM 執行前自動注入 credentials_required 中宣告的 token
|
||||
if (this.env) {
|
||||
// 先試 auth dispatcher(新路徑,走 auth primitive WASM Worker via HTTP)
|
||||
// 命中才 return;否則 fallback 到舊 injectCredentials(Phase 1.9 會刪除)
|
||||
if (this.apiKey) {
|
||||
const dispatched = await tryAuthDispatch(node.componentId, mergedContext, this.env, this.apiKey);
|
||||
if (dispatched) {
|
||||
mergedContext = dispatched;
|
||||
} else {
|
||||
mergedContext = await injectCredentials(node.componentId, mergedContext, this.env, this.apiKey);
|
||||
}
|
||||
} else {
|
||||
mergedContext = await injectCredentials(node.componentId, mergedContext, this.env, this.apiKey);
|
||||
}
|
||||
}
|
||||
|
||||
nodeInput = mergedContext;
|
||||
result = await runner(mergedContext);
|
||||
|
||||
// Resumable workflow:偵測 pending,持久化 paused state 後 throw WorkflowPaused
|
||||
// SDD: resumable-workflow/design.md §3.2.1
|
||||
// 注意:放在 recipe output parsing 之前 — pending 結果不該被當 JSON 解析
|
||||
const pending = isResumablePending(result);
|
||||
if (pending && this.env?.EXEC_CONTEXT && this.currentGraph && this.currentRunId) {
|
||||
// 把這個節點的執行紀錄寫進 trace(status=paused)
|
||||
trace.push({
|
||||
nodeId: node.id,
|
||||
type: node.type,
|
||||
input: nodeInput,
|
||||
output: result,
|
||||
duration_ms: Date.now() - start,
|
||||
});
|
||||
await persistPausedRun(this.env.EXEC_CONTEXT, pending.task_id, {
|
||||
run_id: this.currentRunId,
|
||||
graph: this.currentGraph,
|
||||
paused_node_id: node.id,
|
||||
paused_context: context as Record<string, unknown>,
|
||||
paused_pending_result: result as Record<string, unknown>,
|
||||
trace_so_far: trace,
|
||||
api_key: this.apiKey,
|
||||
expires_at: Date.now() + 24 * 60 * 60 * 1000,
|
||||
recipe_output_format: mergedContext._recipe_output_format as 'text' | 'json' | undefined,
|
||||
recipe_output_required_fields: mergedContext._recipe_output_required_fields as string[] | undefined,
|
||||
});
|
||||
throw new WorkflowPaused(pending.task_id, this.currentRunId, node.id, trace);
|
||||
}
|
||||
|
||||
// Recipe output parsing:用 parseRecipeOutput 統一處理(立刻回 + resume 長回兩條路共用)
|
||||
// SDD: recipe-system + resumable-workflow
|
||||
// 解完後 parsed JSON 的 top-level 欄位(如 paragraphs / triplets)spread 到 result,
|
||||
// 讓下游 FOREACH 跟 {{var}} 模板直接可取
|
||||
result = parseRecipeOutput(
|
||||
result,
|
||||
mergedContext._recipe_output_format as 'text' | 'json' | undefined,
|
||||
mergedContext._recipe_output_required_fields as string[] | undefined,
|
||||
);
|
||||
|
||||
// BUILD-006:將節點 output 存入 KV(key = {run_id}:node:{node_id})
|
||||
// 這讓下游節點可以透過 KV 讀取上游的具名 output,解決同名欄位衝突
|
||||
if (kvStore && result !== null && result !== undefined) {
|
||||
await kvSetNodeOutput(kvStore, node.id, result);
|
||||
}
|
||||
|
||||
// Phase 2:記錄 component 被引用(追蹤生命週期)
|
||||
// 由 component-registry 追蹤使用狀態,決定是否保留
|
||||
// 在後台執行,不阻擋主流程
|
||||
void this.recordComponentReference?.(node.componentId, graph.id).catch(() => {
|
||||
// 記錄失敗不應該中止執行
|
||||
});
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case 'Output':
|
||||
result = context;
|
||||
break;
|
||||
}
|
||||
} catch (e: any) {
|
||||
// WorkflowPaused 不是錯誤,是「workflow 暫停」訊號,直接往上傳
|
||||
// SDD: resumable-workflow/design.md
|
||||
if (e instanceof WorkflowPaused) throw e;
|
||||
|
||||
const errMsg = e.message || String(e);
|
||||
const duration_ms = Date.now() - start;
|
||||
trace.push({
|
||||
nodeId: node.id,
|
||||
type: node.type,
|
||||
input: nodeInput,
|
||||
output: null,
|
||||
error: errMsg,
|
||||
duration_ms,
|
||||
});
|
||||
// Step-level telemetry:node 失敗事件(LI SDD M2.x 自評建議)
|
||||
if (this.env && node.type === 'Component') {
|
||||
recordTelemetry(this.env, this.apiKey, {
|
||||
event_type: 'node_failure',
|
||||
workflow_name: graph.name,
|
||||
component_id: node.componentId,
|
||||
error_code: 'node_error',
|
||||
duration_ms,
|
||||
});
|
||||
}
|
||||
// 若已是 ExecutionError(上游節點拋出),保留原始 trace 繼續往上傳
|
||||
if (e instanceof ExecutionError) throw e;
|
||||
throw new ExecutionError(
|
||||
`Node ${node.id} failed: ${errMsg}`,
|
||||
node.id,
|
||||
nodeInput,
|
||||
trace,
|
||||
);
|
||||
}
|
||||
|
||||
const duration_ms = Date.now() - start;
|
||||
trace.push({
|
||||
nodeId: node.id,
|
||||
type: node.type,
|
||||
input: nodeInput,
|
||||
output: result,
|
||||
duration_ms,
|
||||
});
|
||||
|
||||
// Step-level telemetry:node 成功事件(只記 Component,Input/Output 跳過)
|
||||
// LI SDD M2.x:給 weekly_review 提的「效能基準線」建議用 — 每個 node duration 都可追
|
||||
if (this.env && node.type === 'Component') {
|
||||
recordTelemetry(this.env, this.apiKey, {
|
||||
event_type: 'node_success',
|
||||
workflow_name: graph.name,
|
||||
component_id: node.componentId,
|
||||
duration_ms,
|
||||
});
|
||||
}
|
||||
|
||||
// 處理出邊
|
||||
const outEdges = graph.edges.filter(e => e.from === node.id);
|
||||
|
||||
for (const edge of outEdges) {
|
||||
const nextNode = graph.nodes.find(n => n.id === edge.to);
|
||||
if (!nextNode) continue;
|
||||
|
||||
switch (edge.type as EdgeType) {
|
||||
case 'PIPE': {
|
||||
const pipeContext: Record<string, unknown> = propagateCtx(context, result, node.id);
|
||||
|
||||
if (kvStore) {
|
||||
const kvOutput = await kvGetNodeOutput(kvStore, node.id);
|
||||
if (kvOutput !== undefined) {
|
||||
if (!pipeContext._kv_outputs) pipeContext._kv_outputs = {};
|
||||
(pipeContext._kv_outputs as Record<string, unknown>)[node.id] = kvOutput;
|
||||
}
|
||||
}
|
||||
|
||||
const fanInState = fanIn.get(nextNode.id);
|
||||
if (fanInState) {
|
||||
Object.assign(fanInState.ctx, pipeContext);
|
||||
fanInState.remaining--;
|
||||
if (fanInState.remaining === 0) {
|
||||
result = await this.executeNode(nextNode, graph, fanInState.ctx, visited, trace, fanIn, kvStore);
|
||||
}
|
||||
} else {
|
||||
result = await this.executeNode(nextNode, graph, pipeContext, visited, trace, fanIn, kvStore);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case 'ON_SUCCESS': {
|
||||
if (!isFailure(result)) {
|
||||
const mergedCtx = propagateCtx(context, result, node.id);
|
||||
result = await this.executeNode(nextNode, graph, mergedCtx, visited, trace, fanIn, kvStore);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case 'ON_FAIL': {
|
||||
if (isFailure(result)) {
|
||||
const mergedCtx = propagateCtx(context, result, node.id);
|
||||
result = await this.executeNode(nextNode, graph, mergedCtx, visited, trace, fanIn, kvStore);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case 'IF': {
|
||||
const passes = evaluateCondition(edge.condition ?? 'true', result);
|
||||
if (passes) {
|
||||
const mergedCtx = propagateCtx(context, result, node.id);
|
||||
result = await this.executeNode(nextNode, graph, mergedCtx, visited, trace, fanIn, kvStore);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case 'FOREACH': {
|
||||
const iteratorKey = edge.iterator ?? 'item';
|
||||
// 找 iterable 順序:先看上游 output (result),沒有再看完整 context (含上游 chain 累積的 fields)
|
||||
// 2026-05-13:原本只看 result,但 result 是當前節點 output (如 create_wiki_page 只回 {data, success})
|
||||
// 不含更上游節點給的 paragraphs。propagateCtx 已把 paragraphs spread 進 ctx,FOREACH 該能取到
|
||||
let items = getIterableFromContext(result, iteratorKey);
|
||||
if (items.length === 0) {
|
||||
items = getIterableFromContext(context, iteratorKey);
|
||||
}
|
||||
const iterResults: unknown[] = [];
|
||||
|
||||
// FOREACH itemContext 順序:propagateCtx + 加 iterator key
|
||||
const baseForeachCtx = propagateCtx(context, result, node.id);
|
||||
for (const item of items) {
|
||||
const itemContext = {
|
||||
...baseForeachCtx,
|
||||
[iteratorKey]: item,
|
||||
};
|
||||
const itemResult = await this.executeNode(nextNode, graph, itemContext, new Set(), trace, fanIn, kvStore);
|
||||
iterResults.push(itemResult);
|
||||
}
|
||||
|
||||
result = { ...(result as Record<string, unknown>), results: iterResults };
|
||||
break;
|
||||
}
|
||||
|
||||
case 'CALLS_SUBFLOW': {
|
||||
// 從 workflowLoader 載入子 Workflow,以當前 context 執行,輸出合併回主流程
|
||||
const subWorkflowId = nextNode.componentId?.replace('workflow://', '') ?? nextNode.id;
|
||||
if (this.workflowLoader) {
|
||||
const subGraph = await this.workflowLoader(subWorkflowId);
|
||||
const subExecutor = new GraphExecutor(this.loader, this.workflowLoader);
|
||||
const subResult = await subExecutor.execute(
|
||||
subGraph,
|
||||
result as Record<string, unknown>,
|
||||
kvStore?.kv,
|
||||
);
|
||||
result = {
|
||||
...(result as Record<string, unknown>),
|
||||
...(subResult.data as Record<string, unknown>),
|
||||
};
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case 'ON_CLICK': {
|
||||
const mergedCtx = propagateCtx(context, result, node.id);
|
||||
result = await this.executeNode(nextNode, graph, mergedCtx, visited, trace, fanIn, kvStore);
|
||||
break;
|
||||
}
|
||||
|
||||
case 'IS_A': {
|
||||
// 節點類型宣告:記錄 componentId,不執行
|
||||
// IS_A 邊的 to 是零件 URI(如 component://validate_json)
|
||||
// 這個資訊已在 graph-builder 階段處理,執行時不需要額外動作
|
||||
break;
|
||||
}
|
||||
|
||||
case 'CONTAINS':
|
||||
case 'HAS_STYLE':
|
||||
case 'HAS_BEHAVIOR': {
|
||||
// 結構語意:只記錄圖結構,不執行
|
||||
break;
|
||||
}
|
||||
|
||||
case 'CONTINUE':
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
/** 給下游節點組 ctx:merge 原 context + 上游 output (spread) + 上游 output 用 node id namespace
|
||||
* 讓下游能用:
|
||||
* {{api_key}}(global,從 baseCtx)
|
||||
* {{data.text}}(上一節點 output spread 進來,會被下下個節點覆蓋)
|
||||
* {{classify.data.text}}(指名某節點 output,永不被覆蓋因 node id 唯一)
|
||||
* 優先順位:baseCtx(含先前 node namespace)< 上游 output spread < 當前 node namespace
|
||||
*/
|
||||
function propagateCtx(
|
||||
context: unknown,
|
||||
upstreamResult: unknown,
|
||||
upstreamNodeId: string,
|
||||
): Record<string, unknown> {
|
||||
const baseCtx = (typeof context === 'object' && context !== null) ? context as Record<string, unknown> : {};
|
||||
const baseResult = (typeof upstreamResult === 'object' && upstreamResult !== null) ? upstreamResult as Record<string, unknown> : {};
|
||||
return {
|
||||
...baseCtx,
|
||||
...baseResult,
|
||||
[upstreamNodeId]: upstreamResult,
|
||||
};
|
||||
}
|
||||
|
||||
/** node.data 內所有 string 值(含 nested object / array)支援 {{variable}} 替換
|
||||
* 支援嵌套 path:{{item.content}} → ctx.item.content
|
||||
* 支援 array index:{{paragraphs.0.entity}} → ctx.paragraphs[0].entity
|
||||
* 非 string 值(object/array)遞迴展開內部 string;undefined / null / number / bool 保留原值
|
||||
* 2026-05-13 加遞迴:原本只跑 top-level,set 零件 values 嵌套 / 任何零件 body 內含 {{x.y}} 用不了。
|
||||
* 2026-05-14 加 single-ref pass-through:若整個 string 是 `{{x}}` 且 x 是 array / object,
|
||||
* 回 raw value 不 stringify(讓 filter `items: "{{list.blocks}}"` 能拿到真陣列)。
|
||||
* 多 ref 或混合文字仍 stringify 為字串。
|
||||
*/
|
||||
function interpolateString(s: string, ctx: Record<string, unknown>): unknown {
|
||||
// 整個值是單一 {{x}} 引用 → 回 raw value(保留 array / object 型別)
|
||||
const single = s.match(/^\s*\{\{([\w.]+)\}\}\s*$/);
|
||||
if (single) {
|
||||
const val = getNestedValue(ctx, single[1]);
|
||||
return val === undefined ? s : val;
|
||||
}
|
||||
// 多 ref / 混合文字 → 一律拼成 string
|
||||
return s.replace(/\{\{([\w.]+)\}\}/g, (_, key: string) => {
|
||||
const val = getNestedValue(ctx, key);
|
||||
if (val === undefined) return `{{${key}}}`;
|
||||
if (typeof val === 'string') return val;
|
||||
return JSON.stringify(val);
|
||||
});
|
||||
}
|
||||
|
||||
function interpolateValue(v: unknown, ctx: Record<string, unknown>): unknown {
|
||||
if (typeof v === 'string') return interpolateString(v, ctx);
|
||||
if (Array.isArray(v)) return v.map(item => interpolateValue(item, ctx));
|
||||
if (v !== null && typeof v === 'object') {
|
||||
const result: Record<string, unknown> = {};
|
||||
for (const [k, val] of Object.entries(v as Record<string, unknown>)) {
|
||||
result[k] = interpolateValue(val, ctx);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
return v;
|
||||
}
|
||||
|
||||
function interpolateData(
|
||||
data: Record<string, unknown> | undefined,
|
||||
ctx: Record<string, unknown>,
|
||||
): Record<string, unknown> {
|
||||
if (!data) return {};
|
||||
return interpolateValue(data, ctx) as Record<string, unknown>;
|
||||
}
|
||||
|
||||
/** 從 ctx 用 dot path 取嵌套值:'a.b.0.c' → ctx.a.b[0].c */
|
||||
function getNestedValue(ctx: unknown, path: string): unknown {
|
||||
const parts = path.split('.');
|
||||
let cur: unknown = ctx;
|
||||
for (const p of parts) {
|
||||
if (cur === null || cur === undefined) return undefined;
|
||||
if (typeof cur !== 'object') return undefined;
|
||||
cur = (cur as Record<string, unknown>)[p];
|
||||
}
|
||||
return cur;
|
||||
}
|
||||
|
||||
/** 判斷節點執行結果是否為失敗:success === false 或含有 error key */
|
||||
function isFailure(result: unknown): boolean {
|
||||
if (!result || typeof result !== 'object') return false;
|
||||
const r = result as Record<string, unknown>;
|
||||
return r['success'] === false || 'error' in r;
|
||||
}
|
||||
|
||||
/**
|
||||
* 安全條件評估(不使用 new Function)
|
||||
* 支援格式:ctx.key === value, ctx.key > value, ctx.key(truthy)
|
||||
*/
|
||||
function evaluateCondition(condition: string, context: unknown): boolean {
|
||||
if (!context || typeof context !== 'object') return false;
|
||||
const ctx = context as Record<string, unknown>;
|
||||
|
||||
// 正規化:把 result. 替換為空(直接存取 key)
|
||||
const expr = condition.replace(/result\./g, '').replace(/ctx\./g, '');
|
||||
|
||||
// 簡單 === 比較
|
||||
const eqMatch = expr.match(/^(\w+)\s*===?\s*(.+)$/);
|
||||
if (eqMatch) {
|
||||
const key = eqMatch[1];
|
||||
const rawVal = eqMatch[2].trim();
|
||||
const expected = rawVal === 'true' ? true : rawVal === 'false' ? false : rawVal.replace(/['"]/g, '');
|
||||
return ctx[key] === expected;
|
||||
}
|
||||
|
||||
// 簡單 > 比較
|
||||
const gtMatch = expr.match(/^(\w+)\s*>\s*(\d+)$/);
|
||||
if (gtMatch) {
|
||||
return Number(ctx[gtMatch[1]]) > Number(gtMatch[2]);
|
||||
}
|
||||
|
||||
// truthy check
|
||||
const key = expr.trim();
|
||||
if (key && key in ctx) return !!ctx[key];
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
function getIterableFromContext(context: unknown, key: string): unknown[] {
|
||||
if (!context || typeof context !== 'object') return [];
|
||||
// 多種 plural 變體:entity → entities / paragraph → paragraphs / box → boxes / 等
|
||||
// 2026-05-17:原本只試 key+'s','entity+s=entitys' ≠ 'entities' 無法命中,加 irregular
|
||||
const variants = [
|
||||
key + 's', // paragraph → paragraphs
|
||||
key.replace(/y$/, 'ies'), // entity → entities
|
||||
key.replace(/(s|x|z|ch|sh)$/, '$1es'), // box → boxes
|
||||
key, // singular fallback
|
||||
];
|
||||
const obj = context as Record<string, unknown>;
|
||||
|
||||
// 先看 top-level(最常見)
|
||||
for (const v of variants) {
|
||||
if (Array.isArray(obj[v])) return obj[v] as unknown[];
|
||||
}
|
||||
|
||||
// 若找不到,掃一層內部 object 看 nested(巢狀 FOREACH 場景:
|
||||
// 外層 FOREACH 把 paragraph 注入 ctx,內層 FOREACH 要找 paragraph.triplets)
|
||||
for (const val of Object.values(obj)) {
|
||||
if (val !== null && typeof val === 'object' && !Array.isArray(val)) {
|
||||
for (const v of variants) {
|
||||
const nested = (val as Record<string, unknown>)[v];
|
||||
if (Array.isArray(nested)) return nested;
|
||||
}
|
||||
}
|
||||
}
|
||||
return [];
|
||||
}
|
||||
Reference in New Issue
Block a user