feat(cypher): 3-node wiki workflow end-to-end (FOREACH + nested interp + unified parsing)

Three platform-level improvements that together enable the full
"草稿 → LLM 整理 → KBDB" wiki ingest workflow via cypher binding:

## 1. Nested interpolation in node.data
Previously {{var}} only supported top-level keys, so {{item.content}}
literal-passed through. Now supports dot-path:
  {{paragraph.content}} → ctx.paragraph.content
  {{paragraphs.0.entity}} → ctx.paragraphs[0].entity
Non-string values (object/array) JSON.stringify automatically.

## 2. 對每個 X cypher binding syntax
'A >> 對每個 paragraph >> B' parses into FOREACH edge with
iterator='paragraph'. graph-builder.ts strips the iterator from label
before edge type resolution. Backwards compatible: bare '對每個' still
defaults to item.

## 3. FOREACH preserves outer context
itemContext was previously {...result, [iter]: item}, dropping
top-level api_key etc. Now {...outerCtx, ...result, [iter]: item} so
{{api_key}} interpolation works in foreach body.

## 4. Unified recipe output parsing (sync + resume)
Extracted parseRecipeOutput() helper used by both sync claude_api
result + workflow resume callback. Strips ```json fence, parses,
spreads parsed top-level fields into result so downstream FOREACH
finds 'paragraphs' (not buried in data.paragraphs).

paused state now stores recipe_output_format + required_fields so
resume route can apply same parsing as sync path.

End-to-end verified:
- input(草稿+api_key) → synth(claude_api+recipe) → 對每個 paragraph → write_wiki(kbdb_create_block)
- Real Claude synthesis on Mira daemon: 3 triplets + 2 paragraphs
- Both paragraphs written to KBDB as wiki-page blocks (verified GET)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-07 16:23:02 +08:00
parent 497f92a268
commit e8fca33f80
4 changed files with 115 additions and 40 deletions
+17 -3
View File
@@ -32,11 +32,25 @@ export function buildExecutionGraph(
return { id, type: nr.type, componentId, label: name, data }; return { id, type: nr.type, componentId, label: name, data };
}); });
const edges = parsed.edges.map(e => ({ const edges = parsed.edges.map(e => {
// 「對每個 X」label 抽 iteratorcypher binding 表達 FOREACH 的迭代變數
// 例:'A >> 對每個 paragraph >> B' → type=FOREACH, iterator='paragraph'
// getIterableFromContext 會找 ctx.paragraphs(複數)或 ctx.paragraph
let iterator: string | undefined;
let label = e.label;
const foreachMatch = label.match(/^(?:對每個|FOREACH)\s+(\w+)$/i);
if (foreachMatch) {
iterator = foreachMatch[1];
label = '對每個'; // 改回標準 label 走 SEMANTIC_EDGE_MAP
}
const edge: { from: string; to: string; type: ReturnType<typeof toEdgeType>; iterator?: string } = {
from: e.from.toLowerCase().replace(/\s+/g, '-'), from: e.from.toLowerCase().replace(/\s+/g, '-'),
to: e.to.toLowerCase().replace(/\s+/g, '-'), to: e.to.toLowerCase().replace(/\s+/g, '-'),
type: toEdgeType(e.label), type: toEdgeType(label),
})); };
if (iterator) edge.iterator = iterator;
return edge;
});
return { id: graphId, name: graphName, nodes, edges }; return { id: graphId, name: graphName, nodes, edges };
} }
+54 -35
View File
@@ -4,7 +4,7 @@ import { kvSetNodeOutput, kvGetNodeOutput, ExecutionError, WorkflowPaused } from
import { injectCredentials } from './actions/credential-injector'; import { injectCredentials } from './actions/credential-injector';
import { tryAuthDispatch } from './actions/auth-dispatcher'; import { tryAuthDispatch } from './actions/auth-dispatcher';
import { expandPromptRecipe } from './lib/recipe-expander'; import { expandPromptRecipe } from './lib/recipe-expander';
import { persistPausedRun, isResumablePending } from './lib/paused-runs'; import { persistPausedRun, isResumablePending, parseRecipeOutput } from './lib/paused-runs';
export type ComponentLoader = (componentId: string) => Promise<ComponentRunner>; export type ComponentLoader = (componentId: string) => Promise<ComponentRunner>;
export type WorkflowLoader = (workflowId: string) => Promise<ExecutionGraph>; export type WorkflowLoader = (workflowId: string) => Promise<ExecutionGraph>;
@@ -109,8 +109,19 @@ export class GraphExecutor {
callback_result: Record<string, unknown>; // daemon callback 給的 result(取代 paused result callback_result: Record<string, unknown>; // daemon callback 給的 result(取代 paused result
prior_trace: TraceStep[]; prior_trace: TraceStep[];
kvNamespace?: KVNamespace; kvNamespace?: KVNamespace;
recipe_output_format?: 'text' | 'json';
recipe_output_required_fields?: string[];
}): Promise<{ data: unknown; trace: TraceStep[] }> { }): Promise<{ data: unknown; trace: TraceStep[] }> {
const { graph, paused_node_id, paused_context, callback_result, prior_trace, kvNamespace } = args; const { graph, paused_node_id, paused_context, prior_trace, kvNamespace } = args;
let { callback_result } = args;
// Recipe output parsing:跟立刻回路徑同樣解析(spread parsed 欄位到 top-level
// SDD: recipe-system + resumable-workflow
callback_result = parseRecipeOutput(
callback_result,
args.recipe_output_format,
args.recipe_output_required_fields,
) as Record<string, unknown>;
this.currentGraph = graph; this.currentGraph = graph;
this.currentRunId = `${graph.id}-resume-${Date.now()}`; this.currentRunId = `${graph.id}-resume-${Date.now()}`;
@@ -287,38 +298,21 @@ export class GraphExecutor {
trace_so_far: trace, trace_so_far: trace,
api_key: this.apiKey, api_key: this.apiKey,
expires_at: Date.now() + 24 * 60 * 60 * 1000, expires_at: Date.now() + 24 * 60 * 60 * 1000,
recipe_output_format: mergedContext._recipe_output_format as 'text' | 'json' | undefined,
recipe_output_required_fields: mergedContext._recipe_output_required_fields as string[] | undefined,
}); });
throw new WorkflowPaused(pending.task_id, this.currentRunId, node.id, trace); throw new WorkflowPaused(pending.task_id, this.currentRunId, node.id, trace);
} }
// Recipe output parsing若 recipe 指定 format=json,把 claude_api 回傳的 text 自動 parse // Recipe output parsing用 parseRecipeOutput 統一處理(立刻回 + resume 長回兩條路共用)
// 失敗 → 包裝成 success:false,給下游清楚知道是 LLM output 解析問題 // SDD: recipe-system + resumable-workflow
if (mergedContext._recipe_output_format === 'json' && result && typeof result === 'object') { // 解完後 parsed JSON 的 top-level 欄位(如 paragraphs / tripletsspread 到 result
const r = result as Record<string, unknown>; // 讓下游 FOREACH 跟 {{var}} 模板直接可取
const text = (r.data as Record<string, unknown> | undefined)?.text ?? r.text; result = parseRecipeOutput(
if (typeof text === 'string') { result,
// 剝除 ```json ... ``` markdown fenceClaude 常這樣包) mergedContext._recipe_output_format as 'text' | 'json' | undefined,
let jsonText = String(text).trim(); mergedContext._recipe_output_required_fields as string[] | undefined,
const fenceMatch = jsonText.match(/^```(?:json)?\s*\n([\s\S]*?)\n```$/); );
if (fenceMatch) jsonText = fenceMatch[1].trim();
try {
const parsed = JSON.parse(jsonText);
const required = mergedContext._recipe_output_required_fields as string[] | undefined;
if (required && parsed && typeof parsed === 'object') {
const missing = required.filter((f) => !(f in (parsed as Record<string, unknown>)));
if (missing.length > 0) {
result = { success: false, error: `recipe output 缺欄位: ${missing.join(', ')}`, raw: parsed };
} else {
result = { success: true, data: parsed };
}
} else {
result = { success: true, data: parsed };
}
} catch (e) {
result = { success: false, error: `recipe output JSON parse 失敗: ${e instanceof Error ? e.message : String(e)}`, raw_text: text };
}
}
}
// BUILD-006:將節點 output 存入 KVkey = {run_id}:node:{node_id} // BUILD-006:將節點 output 存入 KVkey = {run_id}:node:{node_id}
// 這讓下游節點可以透過 KV 讀取上游的具名 output,解決同名欄位衝突 // 這讓下游節點可以透過 KV 讀取上游的具名 output,解決同名欄位衝突
@@ -439,8 +433,15 @@ export class GraphExecutor {
const items = getIterableFromContext(result, iteratorKey); const items = getIterableFromContext(result, iteratorKey);
const iterResults: unknown[] = []; const iterResults: unknown[] = [];
// FOREACH itemContext 順序:原 ctx 全局欄位(api_key 等)優先 < result(上游輸出)< item(當前迭代)
// 之前只 spread result,全局 api_key 會丟,下游 {{api_key}} 抓不到
const baseCtx = (typeof context === 'object' && context !== null) ? context as Record<string, unknown> : {};
for (const item of items) { for (const item of items) {
const itemContext = { ...(result as Record<string, unknown>), [iteratorKey]: item }; const itemContext = {
...baseCtx,
...(result as Record<string, unknown>),
[iteratorKey]: item,
};
const itemResult = await this.executeNode(nextNode, graph, itemContext, new Set(), trace, fanIn, kvStore); const itemResult = await this.executeNode(nextNode, graph, itemContext, new Set(), trace, fanIn, kvStore);
iterResults.push(itemResult); iterResults.push(itemResult);
} }
@@ -497,7 +498,11 @@ export class GraphExecutor {
} }
} }
/** node.data 的 string 值支援 {{variable}} 替換,從 context 取值 */ /** node.data 的 string 值支援 {{variable}} 替換,從 context 取值
* 支援嵌套 path{{item.content}} → ctx.item.content
* 支援 array index{{paragraphs.0.entity}} → ctx.paragraphs[0].entity
* 非 string 值(object/array)會 JSON.stringify
*/
function interpolateData( function interpolateData(
data: Record<string, unknown> | undefined, data: Record<string, unknown> | undefined,
ctx: Record<string, unknown>, ctx: Record<string, unknown>,
@@ -506,9 +511,11 @@ function interpolateData(
const result: Record<string, unknown> = {}; const result: Record<string, unknown> = {};
for (const [k, v] of Object.entries(data)) { for (const [k, v] of Object.entries(data)) {
if (typeof v === 'string') { if (typeof v === 'string') {
result[k] = v.replace(/\{\{(\w+)\}\}/g, (_, key) => { result[k] = v.replace(/\{\{([\w.]+)\}\}/g, (_, key: string) => {
const val = ctx[key]; const val = getNestedValue(ctx, key);
return val !== undefined ? String(val) : `{{${key}}}`; if (val === undefined) return `{{${key}}}`;
if (typeof val === 'string') return val;
return JSON.stringify(val);
}); });
} else { } else {
result[k] = v; result[k] = v;
@@ -517,6 +524,18 @@ function interpolateData(
return result; return result;
} }
/** 從 ctx 用 dot path 取嵌套值:'a.b.0.c' → ctx.a.b[0].c */
function getNestedValue(ctx: unknown, path: string): unknown {
const parts = path.split('.');
let cur: unknown = ctx;
for (const p of parts) {
if (cur === null || cur === undefined) return undefined;
if (typeof cur !== 'object') return undefined;
cur = (cur as Record<string, unknown>)[p];
}
return cur;
}
/** 判斷節點執行結果是否為失敗:success === false 或含有 error key */ /** 判斷節點執行結果是否為失敗:success === false 或含有 error key */
function isFailure(result: unknown): boolean { function isFailure(result: unknown): boolean {
if (!result || typeof result !== 'object') return false; if (!result || typeof result !== 'object') return false;
+40
View File
@@ -23,6 +23,9 @@ export interface PausedRunState {
trace_so_far: TraceStep[]; trace_so_far: TraceStep[];
api_key?: string; api_key?: string;
expires_at: number; // unix ms expires_at: number; // unix ms
// resume 時用來 parse callback result 的 recipe output 規格(resumable + recipe 整合)
recipe_output_format?: 'text' | 'json';
recipe_output_required_fields?: string[];
} }
const KEY_PREFIX = 'paused_run:'; const KEY_PREFIX = 'paused_run:';
@@ -79,3 +82,40 @@ export function isResumablePending(result: unknown): { task_id: string } | null
if (typeof r.task_id !== 'string' || !r.task_id) return null; if (typeof r.task_id !== 'string' || !r.task_id) return null;
return { task_id: r.task_id }; return { task_id: r.task_id };
} }
/**
* Parse claude_api result with recipe output format.
* 同步路徑跟 resume 路徑都用同一個解析器,避免邏輯歪掉。
*
* 輸入:result(可能是 {data:{text:"..."}} 或 {text:"..."}
* 輸出:parsed object 或 fallback 結構
*/
export function parseRecipeOutput(
result: unknown,
format: 'text' | 'json' | undefined,
requiredFields: string[] | undefined,
): unknown {
if (format !== 'json' || !result || typeof result !== 'object') return result;
const r = result as Record<string, unknown>;
const text = (r.data as Record<string, unknown> | undefined)?.text ?? r.text;
if (typeof text !== 'string') return result;
// 剝除 ```json ... ``` markdown fenceClaude 常這樣包)
let jsonText = String(text).trim();
const fenceMatch = jsonText.match(/^```(?:json)?\s*\n([\s\S]*?)\n```$/);
if (fenceMatch) jsonText = fenceMatch[1].trim();
try {
const parsed = JSON.parse(jsonText);
if (requiredFields && parsed && typeof parsed === 'object') {
const missing = requiredFields.filter((f) => !(f in (parsed as Record<string, unknown>)));
if (missing.length > 0) {
return { success: false, error: `recipe output 缺欄位: ${missing.join(', ')}`, raw: parsed };
}
}
// 把 parsed 的欄位 spread 到 top-levelFOREACH / 下游 {{var}} 都好取
return { success: true, data: parsed, ...(parsed && typeof parsed === 'object' ? parsed as Record<string, unknown> : {}) };
} catch (e) {
return { success: false, error: `recipe output JSON parse 失敗: ${e instanceof Error ? e.message : String(e)}`, raw_text: text };
}
}
+2
View File
@@ -57,6 +57,8 @@ resumeRouter.post('/workflows/resume', async (c) => {
callback_result: callbackResult, callback_result: callbackResult,
prior_trace: state.trace_so_far, prior_trace: state.trace_so_far,
kvNamespace: c.env.EXEC_CONTEXT, kvNamespace: c.env.EXEC_CONTEXT,
recipe_output_format: state.recipe_output_format,
recipe_output_required_fields: state.recipe_output_required_fields,
}); });
const duration_ms = Date.now() - start; const duration_ms = Date.now() - start;
return c.json({ return c.json({