feat(cypher): 3-node wiki workflow end-to-end (FOREACH + nested interp + unified parsing)
Three platform-level improvements that together enable the full
"草稿 → LLM 整理 → KBDB" wiki ingest workflow via cypher binding:
## 1. Nested interpolation in node.data
Previously {{var}} only supported top-level keys, so {{item.content}}
literal-passed through. Now supports dot-path:
{{paragraph.content}} → ctx.paragraph.content
{{paragraphs.0.entity}} → ctx.paragraphs[0].entity
Non-string values (object/array) JSON.stringify automatically.
## 2. 對每個 X cypher binding syntax
'A >> 對每個 paragraph >> B' parses into FOREACH edge with
iterator='paragraph'. graph-builder.ts strips the iterator from label
before edge type resolution. Backwards compatible: bare '對每個' still
defaults to item.
## 3. FOREACH preserves outer context
itemContext was previously {...result, [iter]: item}, dropping
top-level api_key etc. Now {...outerCtx, ...result, [iter]: item} so
{{api_key}} interpolation works in foreach body.
## 4. Unified recipe output parsing (sync + resume)
Extracted parseRecipeOutput() helper used by both sync claude_api
result + workflow resume callback. Strips ```json fence, parses,
spreads parsed top-level fields into result so downstream FOREACH
finds 'paragraphs' (not buried in data.paragraphs).
paused state now stores recipe_output_format + required_fields so
resume route can apply same parsing as sync path.
End-to-end verified:
- input(草稿+api_key) → synth(claude_api+recipe) → 對每個 paragraph → write_wiki(kbdb_create_block)
- Real Claude synthesis on Mira daemon: 3 triplets + 2 paragraphs
- Both paragraphs written to KBDB as wiki-page blocks (verified GET)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -32,11 +32,25 @@ export function buildExecutionGraph(
|
||||
return { id, type: nr.type, componentId, label: name, data };
|
||||
});
|
||||
|
||||
const edges = parsed.edges.map(e => ({
|
||||
const edges = parsed.edges.map(e => {
|
||||
// 「對每個 X」label 抽 iterator:cypher binding 表達 FOREACH 的迭代變數
|
||||
// 例:'A >> 對每個 paragraph >> B' → type=FOREACH, iterator='paragraph'
|
||||
// getIterableFromContext 會找 ctx.paragraphs(複數)或 ctx.paragraph
|
||||
let iterator: string | undefined;
|
||||
let label = e.label;
|
||||
const foreachMatch = label.match(/^(?:對每個|FOREACH)\s+(\w+)$/i);
|
||||
if (foreachMatch) {
|
||||
iterator = foreachMatch[1];
|
||||
label = '對每個'; // 改回標準 label 走 SEMANTIC_EDGE_MAP
|
||||
}
|
||||
const edge: { from: string; to: string; type: ReturnType<typeof toEdgeType>; iterator?: string } = {
|
||||
from: e.from.toLowerCase().replace(/\s+/g, '-'),
|
||||
to: e.to.toLowerCase().replace(/\s+/g, '-'),
|
||||
type: toEdgeType(e.label),
|
||||
}));
|
||||
type: toEdgeType(label),
|
||||
};
|
||||
if (iterator) edge.iterator = iterator;
|
||||
return edge;
|
||||
});
|
||||
|
||||
return { id: graphId, name: graphName, nodes, edges };
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@ import { kvSetNodeOutput, kvGetNodeOutput, ExecutionError, WorkflowPaused } from
|
||||
import { injectCredentials } from './actions/credential-injector';
|
||||
import { tryAuthDispatch } from './actions/auth-dispatcher';
|
||||
import { expandPromptRecipe } from './lib/recipe-expander';
|
||||
import { persistPausedRun, isResumablePending } from './lib/paused-runs';
|
||||
import { persistPausedRun, isResumablePending, parseRecipeOutput } from './lib/paused-runs';
|
||||
|
||||
export type ComponentLoader = (componentId: string) => Promise<ComponentRunner>;
|
||||
export type WorkflowLoader = (workflowId: string) => Promise<ExecutionGraph>;
|
||||
@@ -109,8 +109,19 @@ export class GraphExecutor {
|
||||
callback_result: Record<string, unknown>; // daemon callback 給的 result(取代 paused result)
|
||||
prior_trace: TraceStep[];
|
||||
kvNamespace?: KVNamespace;
|
||||
recipe_output_format?: 'text' | 'json';
|
||||
recipe_output_required_fields?: string[];
|
||||
}): Promise<{ data: unknown; trace: TraceStep[] }> {
|
||||
const { graph, paused_node_id, paused_context, callback_result, prior_trace, kvNamespace } = args;
|
||||
const { graph, paused_node_id, paused_context, prior_trace, kvNamespace } = args;
|
||||
let { callback_result } = args;
|
||||
|
||||
// Recipe output parsing:跟立刻回路徑同樣解析(spread parsed 欄位到 top-level)
|
||||
// SDD: recipe-system + resumable-workflow
|
||||
callback_result = parseRecipeOutput(
|
||||
callback_result,
|
||||
args.recipe_output_format,
|
||||
args.recipe_output_required_fields,
|
||||
) as Record<string, unknown>;
|
||||
|
||||
this.currentGraph = graph;
|
||||
this.currentRunId = `${graph.id}-resume-${Date.now()}`;
|
||||
@@ -287,38 +298,21 @@ export class GraphExecutor {
|
||||
trace_so_far: trace,
|
||||
api_key: this.apiKey,
|
||||
expires_at: Date.now() + 24 * 60 * 60 * 1000,
|
||||
recipe_output_format: mergedContext._recipe_output_format as 'text' | 'json' | undefined,
|
||||
recipe_output_required_fields: mergedContext._recipe_output_required_fields as string[] | undefined,
|
||||
});
|
||||
throw new WorkflowPaused(pending.task_id, this.currentRunId, node.id, trace);
|
||||
}
|
||||
|
||||
// Recipe output parsing:若 recipe 指定 format=json,把 claude_api 回傳的 text 自動 parse
|
||||
// 失敗 → 包裝成 success:false,給下游清楚知道是 LLM output 解析問題
|
||||
if (mergedContext._recipe_output_format === 'json' && result && typeof result === 'object') {
|
||||
const r = result as Record<string, unknown>;
|
||||
const text = (r.data as Record<string, unknown> | undefined)?.text ?? r.text;
|
||||
if (typeof text === 'string') {
|
||||
// 剝除 ```json ... ``` markdown fence(Claude 常這樣包)
|
||||
let jsonText = String(text).trim();
|
||||
const fenceMatch = jsonText.match(/^```(?:json)?\s*\n([\s\S]*?)\n```$/);
|
||||
if (fenceMatch) jsonText = fenceMatch[1].trim();
|
||||
try {
|
||||
const parsed = JSON.parse(jsonText);
|
||||
const required = mergedContext._recipe_output_required_fields as string[] | undefined;
|
||||
if (required && parsed && typeof parsed === 'object') {
|
||||
const missing = required.filter((f) => !(f in (parsed as Record<string, unknown>)));
|
||||
if (missing.length > 0) {
|
||||
result = { success: false, error: `recipe output 缺欄位: ${missing.join(', ')}`, raw: parsed };
|
||||
} else {
|
||||
result = { success: true, data: parsed };
|
||||
}
|
||||
} else {
|
||||
result = { success: true, data: parsed };
|
||||
}
|
||||
} catch (e) {
|
||||
result = { success: false, error: `recipe output JSON parse 失敗: ${e instanceof Error ? e.message : String(e)}`, raw_text: text };
|
||||
}
|
||||
}
|
||||
}
|
||||
// Recipe output parsing:用 parseRecipeOutput 統一處理(立刻回 + resume 長回兩條路共用)
|
||||
// SDD: recipe-system + resumable-workflow
|
||||
// 解完後 parsed JSON 的 top-level 欄位(如 paragraphs / triplets)spread 到 result,
|
||||
// 讓下游 FOREACH 跟 {{var}} 模板直接可取
|
||||
result = parseRecipeOutput(
|
||||
result,
|
||||
mergedContext._recipe_output_format as 'text' | 'json' | undefined,
|
||||
mergedContext._recipe_output_required_fields as string[] | undefined,
|
||||
);
|
||||
|
||||
// BUILD-006:將節點 output 存入 KV(key = {run_id}:node:{node_id})
|
||||
// 這讓下游節點可以透過 KV 讀取上游的具名 output,解決同名欄位衝突
|
||||
@@ -439,8 +433,15 @@ export class GraphExecutor {
|
||||
const items = getIterableFromContext(result, iteratorKey);
|
||||
const iterResults: unknown[] = [];
|
||||
|
||||
// FOREACH itemContext 順序:原 ctx 全局欄位(api_key 等)優先 < result(上游輸出)< item(當前迭代)
|
||||
// 之前只 spread result,全局 api_key 會丟,下游 {{api_key}} 抓不到
|
||||
const baseCtx = (typeof context === 'object' && context !== null) ? context as Record<string, unknown> : {};
|
||||
for (const item of items) {
|
||||
const itemContext = { ...(result as Record<string, unknown>), [iteratorKey]: item };
|
||||
const itemContext = {
|
||||
...baseCtx,
|
||||
...(result as Record<string, unknown>),
|
||||
[iteratorKey]: item,
|
||||
};
|
||||
const itemResult = await this.executeNode(nextNode, graph, itemContext, new Set(), trace, fanIn, kvStore);
|
||||
iterResults.push(itemResult);
|
||||
}
|
||||
@@ -497,7 +498,11 @@ export class GraphExecutor {
|
||||
}
|
||||
}
|
||||
|
||||
/** node.data 的 string 值支援 {{variable}} 替換,從 context 取值 */
|
||||
/** node.data 的 string 值支援 {{variable}} 替換,從 context 取值
|
||||
* 支援嵌套 path:{{item.content}} → ctx.item.content
|
||||
* 支援 array index:{{paragraphs.0.entity}} → ctx.paragraphs[0].entity
|
||||
* 非 string 值(object/array)會 JSON.stringify
|
||||
*/
|
||||
function interpolateData(
|
||||
data: Record<string, unknown> | undefined,
|
||||
ctx: Record<string, unknown>,
|
||||
@@ -506,9 +511,11 @@ function interpolateData(
|
||||
const result: Record<string, unknown> = {};
|
||||
for (const [k, v] of Object.entries(data)) {
|
||||
if (typeof v === 'string') {
|
||||
result[k] = v.replace(/\{\{(\w+)\}\}/g, (_, key) => {
|
||||
const val = ctx[key];
|
||||
return val !== undefined ? String(val) : `{{${key}}}`;
|
||||
result[k] = v.replace(/\{\{([\w.]+)\}\}/g, (_, key: string) => {
|
||||
const val = getNestedValue(ctx, key);
|
||||
if (val === undefined) return `{{${key}}}`;
|
||||
if (typeof val === 'string') return val;
|
||||
return JSON.stringify(val);
|
||||
});
|
||||
} else {
|
||||
result[k] = v;
|
||||
@@ -517,6 +524,18 @@ function interpolateData(
|
||||
return result;
|
||||
}
|
||||
|
||||
/** 從 ctx 用 dot path 取嵌套值:'a.b.0.c' → ctx.a.b[0].c */
|
||||
function getNestedValue(ctx: unknown, path: string): unknown {
|
||||
const parts = path.split('.');
|
||||
let cur: unknown = ctx;
|
||||
for (const p of parts) {
|
||||
if (cur === null || cur === undefined) return undefined;
|
||||
if (typeof cur !== 'object') return undefined;
|
||||
cur = (cur as Record<string, unknown>)[p];
|
||||
}
|
||||
return cur;
|
||||
}
|
||||
|
||||
/** 判斷節點執行結果是否為失敗:success === false 或含有 error key */
|
||||
function isFailure(result: unknown): boolean {
|
||||
if (!result || typeof result !== 'object') return false;
|
||||
|
||||
@@ -23,6 +23,9 @@ export interface PausedRunState {
|
||||
trace_so_far: TraceStep[];
|
||||
api_key?: string;
|
||||
expires_at: number; // unix ms
|
||||
// resume 時用來 parse callback result 的 recipe output 規格(resumable + recipe 整合)
|
||||
recipe_output_format?: 'text' | 'json';
|
||||
recipe_output_required_fields?: string[];
|
||||
}
|
||||
|
||||
const KEY_PREFIX = 'paused_run:';
|
||||
@@ -79,3 +82,40 @@ export function isResumablePending(result: unknown): { task_id: string } | null
|
||||
if (typeof r.task_id !== 'string' || !r.task_id) return null;
|
||||
return { task_id: r.task_id };
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse claude_api result with recipe output format.
|
||||
* 同步路徑跟 resume 路徑都用同一個解析器,避免邏輯歪掉。
|
||||
*
|
||||
* 輸入:result(可能是 {data:{text:"..."}} 或 {text:"..."})
|
||||
* 輸出:parsed object 或 fallback 結構
|
||||
*/
|
||||
export function parseRecipeOutput(
|
||||
result: unknown,
|
||||
format: 'text' | 'json' | undefined,
|
||||
requiredFields: string[] | undefined,
|
||||
): unknown {
|
||||
if (format !== 'json' || !result || typeof result !== 'object') return result;
|
||||
const r = result as Record<string, unknown>;
|
||||
const text = (r.data as Record<string, unknown> | undefined)?.text ?? r.text;
|
||||
if (typeof text !== 'string') return result;
|
||||
|
||||
// 剝除 ```json ... ``` markdown fence(Claude 常這樣包)
|
||||
let jsonText = String(text).trim();
|
||||
const fenceMatch = jsonText.match(/^```(?:json)?\s*\n([\s\S]*?)\n```$/);
|
||||
if (fenceMatch) jsonText = fenceMatch[1].trim();
|
||||
|
||||
try {
|
||||
const parsed = JSON.parse(jsonText);
|
||||
if (requiredFields && parsed && typeof parsed === 'object') {
|
||||
const missing = requiredFields.filter((f) => !(f in (parsed as Record<string, unknown>)));
|
||||
if (missing.length > 0) {
|
||||
return { success: false, error: `recipe output 缺欄位: ${missing.join(', ')}`, raw: parsed };
|
||||
}
|
||||
}
|
||||
// 把 parsed 的欄位 spread 到 top-level,FOREACH / 下游 {{var}} 都好取
|
||||
return { success: true, data: parsed, ...(parsed && typeof parsed === 'object' ? parsed as Record<string, unknown> : {}) };
|
||||
} catch (e) {
|
||||
return { success: false, error: `recipe output JSON parse 失敗: ${e instanceof Error ? e.message : String(e)}`, raw_text: text };
|
||||
}
|
||||
}
|
||||
|
||||
@@ -57,6 +57,8 @@ resumeRouter.post('/workflows/resume', async (c) => {
|
||||
callback_result: callbackResult,
|
||||
prior_trace: state.trace_so_far,
|
||||
kvNamespace: c.env.EXEC_CONTEXT,
|
||||
recipe_output_format: state.recipe_output_format,
|
||||
recipe_output_required_fields: state.recipe_output_required_fields,
|
||||
});
|
||||
const duration_ms = Date.now() - start;
|
||||
return c.json({
|
||||
|
||||
Reference in New Issue
Block a user