feat(cypher): 3-node wiki workflow end-to-end (FOREACH + nested interp + unified parsing)
Three platform-level improvements that together enable the full
"草稿 → LLM 整理 → KBDB" wiki ingest workflow via cypher binding:
## 1. Nested interpolation in node.data
Previously {{var}} only supported top-level keys, so {{item.content}}
literal-passed through. Now supports dot-path:
{{paragraph.content}} → ctx.paragraph.content
{{paragraphs.0.entity}} → ctx.paragraphs[0].entity
Non-string values (object/array) JSON.stringify automatically.
## 2. 對每個 X cypher binding syntax
'A >> 對每個 paragraph >> B' parses into FOREACH edge with
iterator='paragraph'. graph-builder.ts strips the iterator from label
before edge type resolution. Backwards compatible: bare '對每個' still
defaults to item.
## 3. FOREACH preserves outer context
itemContext was previously {...result, [iter]: item}, dropping
top-level api_key etc. Now {...outerCtx, ...result, [iter]: item} so
{{api_key}} interpolation works in foreach body.
## 4. Unified recipe output parsing (sync + resume)
Extracted parseRecipeOutput() helper used by both sync claude_api
result + workflow resume callback. Strips ```json fence, parses,
spreads parsed top-level fields into result so downstream FOREACH
finds 'paragraphs' (not buried in data.paragraphs).
paused state now stores recipe_output_format + required_fields so
resume route can apply same parsing as sync path.
End-to-end verified:
- input(草稿+api_key) → synth(claude_api+recipe) → 對每個 paragraph → write_wiki(kbdb_create_block)
- Real Claude synthesis on Mira daemon: 3 triplets + 2 paragraphs
- Both paragraphs written to KBDB as wiki-page blocks (verified GET)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -32,11 +32,25 @@ export function buildExecutionGraph(
|
|||||||
return { id, type: nr.type, componentId, label: name, data };
|
return { id, type: nr.type, componentId, label: name, data };
|
||||||
});
|
});
|
||||||
|
|
||||||
const edges = parsed.edges.map(e => ({
|
const edges = parsed.edges.map(e => {
|
||||||
|
// 「對每個 X」label 抽 iterator:cypher binding 表達 FOREACH 的迭代變數
|
||||||
|
// 例:'A >> 對每個 paragraph >> B' → type=FOREACH, iterator='paragraph'
|
||||||
|
// getIterableFromContext 會找 ctx.paragraphs(複數)或 ctx.paragraph
|
||||||
|
let iterator: string | undefined;
|
||||||
|
let label = e.label;
|
||||||
|
const foreachMatch = label.match(/^(?:對每個|FOREACH)\s+(\w+)$/i);
|
||||||
|
if (foreachMatch) {
|
||||||
|
iterator = foreachMatch[1];
|
||||||
|
label = '對每個'; // 改回標準 label 走 SEMANTIC_EDGE_MAP
|
||||||
|
}
|
||||||
|
const edge: { from: string; to: string; type: ReturnType<typeof toEdgeType>; iterator?: string } = {
|
||||||
from: e.from.toLowerCase().replace(/\s+/g, '-'),
|
from: e.from.toLowerCase().replace(/\s+/g, '-'),
|
||||||
to: e.to.toLowerCase().replace(/\s+/g, '-'),
|
to: e.to.toLowerCase().replace(/\s+/g, '-'),
|
||||||
type: toEdgeType(e.label),
|
type: toEdgeType(label),
|
||||||
}));
|
};
|
||||||
|
if (iterator) edge.iterator = iterator;
|
||||||
|
return edge;
|
||||||
|
});
|
||||||
|
|
||||||
return { id: graphId, name: graphName, nodes, edges };
|
return { id: graphId, name: graphName, nodes, edges };
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ import { kvSetNodeOutput, kvGetNodeOutput, ExecutionError, WorkflowPaused } from
|
|||||||
import { injectCredentials } from './actions/credential-injector';
|
import { injectCredentials } from './actions/credential-injector';
|
||||||
import { tryAuthDispatch } from './actions/auth-dispatcher';
|
import { tryAuthDispatch } from './actions/auth-dispatcher';
|
||||||
import { expandPromptRecipe } from './lib/recipe-expander';
|
import { expandPromptRecipe } from './lib/recipe-expander';
|
||||||
import { persistPausedRun, isResumablePending } from './lib/paused-runs';
|
import { persistPausedRun, isResumablePending, parseRecipeOutput } from './lib/paused-runs';
|
||||||
|
|
||||||
export type ComponentLoader = (componentId: string) => Promise<ComponentRunner>;
|
export type ComponentLoader = (componentId: string) => Promise<ComponentRunner>;
|
||||||
export type WorkflowLoader = (workflowId: string) => Promise<ExecutionGraph>;
|
export type WorkflowLoader = (workflowId: string) => Promise<ExecutionGraph>;
|
||||||
@@ -109,8 +109,19 @@ export class GraphExecutor {
|
|||||||
callback_result: Record<string, unknown>; // daemon callback 給的 result(取代 paused result)
|
callback_result: Record<string, unknown>; // daemon callback 給的 result(取代 paused result)
|
||||||
prior_trace: TraceStep[];
|
prior_trace: TraceStep[];
|
||||||
kvNamespace?: KVNamespace;
|
kvNamespace?: KVNamespace;
|
||||||
|
recipe_output_format?: 'text' | 'json';
|
||||||
|
recipe_output_required_fields?: string[];
|
||||||
}): Promise<{ data: unknown; trace: TraceStep[] }> {
|
}): Promise<{ data: unknown; trace: TraceStep[] }> {
|
||||||
const { graph, paused_node_id, paused_context, callback_result, prior_trace, kvNamespace } = args;
|
const { graph, paused_node_id, paused_context, prior_trace, kvNamespace } = args;
|
||||||
|
let { callback_result } = args;
|
||||||
|
|
||||||
|
// Recipe output parsing:跟立刻回路徑同樣解析(spread parsed 欄位到 top-level)
|
||||||
|
// SDD: recipe-system + resumable-workflow
|
||||||
|
callback_result = parseRecipeOutput(
|
||||||
|
callback_result,
|
||||||
|
args.recipe_output_format,
|
||||||
|
args.recipe_output_required_fields,
|
||||||
|
) as Record<string, unknown>;
|
||||||
|
|
||||||
this.currentGraph = graph;
|
this.currentGraph = graph;
|
||||||
this.currentRunId = `${graph.id}-resume-${Date.now()}`;
|
this.currentRunId = `${graph.id}-resume-${Date.now()}`;
|
||||||
@@ -287,38 +298,21 @@ export class GraphExecutor {
|
|||||||
trace_so_far: trace,
|
trace_so_far: trace,
|
||||||
api_key: this.apiKey,
|
api_key: this.apiKey,
|
||||||
expires_at: Date.now() + 24 * 60 * 60 * 1000,
|
expires_at: Date.now() + 24 * 60 * 60 * 1000,
|
||||||
|
recipe_output_format: mergedContext._recipe_output_format as 'text' | 'json' | undefined,
|
||||||
|
recipe_output_required_fields: mergedContext._recipe_output_required_fields as string[] | undefined,
|
||||||
});
|
});
|
||||||
throw new WorkflowPaused(pending.task_id, this.currentRunId, node.id, trace);
|
throw new WorkflowPaused(pending.task_id, this.currentRunId, node.id, trace);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Recipe output parsing:若 recipe 指定 format=json,把 claude_api 回傳的 text 自動 parse
|
// Recipe output parsing:用 parseRecipeOutput 統一處理(立刻回 + resume 長回兩條路共用)
|
||||||
// 失敗 → 包裝成 success:false,給下游清楚知道是 LLM output 解析問題
|
// SDD: recipe-system + resumable-workflow
|
||||||
if (mergedContext._recipe_output_format === 'json' && result && typeof result === 'object') {
|
// 解完後 parsed JSON 的 top-level 欄位(如 paragraphs / triplets)spread 到 result,
|
||||||
const r = result as Record<string, unknown>;
|
// 讓下游 FOREACH 跟 {{var}} 模板直接可取
|
||||||
const text = (r.data as Record<string, unknown> | undefined)?.text ?? r.text;
|
result = parseRecipeOutput(
|
||||||
if (typeof text === 'string') {
|
result,
|
||||||
// 剝除 ```json ... ``` markdown fence(Claude 常這樣包)
|
mergedContext._recipe_output_format as 'text' | 'json' | undefined,
|
||||||
let jsonText = String(text).trim();
|
mergedContext._recipe_output_required_fields as string[] | undefined,
|
||||||
const fenceMatch = jsonText.match(/^```(?:json)?\s*\n([\s\S]*?)\n```$/);
|
);
|
||||||
if (fenceMatch) jsonText = fenceMatch[1].trim();
|
|
||||||
try {
|
|
||||||
const parsed = JSON.parse(jsonText);
|
|
||||||
const required = mergedContext._recipe_output_required_fields as string[] | undefined;
|
|
||||||
if (required && parsed && typeof parsed === 'object') {
|
|
||||||
const missing = required.filter((f) => !(f in (parsed as Record<string, unknown>)));
|
|
||||||
if (missing.length > 0) {
|
|
||||||
result = { success: false, error: `recipe output 缺欄位: ${missing.join(', ')}`, raw: parsed };
|
|
||||||
} else {
|
|
||||||
result = { success: true, data: parsed };
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
result = { success: true, data: parsed };
|
|
||||||
}
|
|
||||||
} catch (e) {
|
|
||||||
result = { success: false, error: `recipe output JSON parse 失敗: ${e instanceof Error ? e.message : String(e)}`, raw_text: text };
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// BUILD-006:將節點 output 存入 KV(key = {run_id}:node:{node_id})
|
// BUILD-006:將節點 output 存入 KV(key = {run_id}:node:{node_id})
|
||||||
// 這讓下游節點可以透過 KV 讀取上游的具名 output,解決同名欄位衝突
|
// 這讓下游節點可以透過 KV 讀取上游的具名 output,解決同名欄位衝突
|
||||||
@@ -439,8 +433,15 @@ export class GraphExecutor {
|
|||||||
const items = getIterableFromContext(result, iteratorKey);
|
const items = getIterableFromContext(result, iteratorKey);
|
||||||
const iterResults: unknown[] = [];
|
const iterResults: unknown[] = [];
|
||||||
|
|
||||||
|
// FOREACH itemContext 順序:原 ctx 全局欄位(api_key 等)優先 < result(上游輸出)< item(當前迭代)
|
||||||
|
// 之前只 spread result,全局 api_key 會丟,下游 {{api_key}} 抓不到
|
||||||
|
const baseCtx = (typeof context === 'object' && context !== null) ? context as Record<string, unknown> : {};
|
||||||
for (const item of items) {
|
for (const item of items) {
|
||||||
const itemContext = { ...(result as Record<string, unknown>), [iteratorKey]: item };
|
const itemContext = {
|
||||||
|
...baseCtx,
|
||||||
|
...(result as Record<string, unknown>),
|
||||||
|
[iteratorKey]: item,
|
||||||
|
};
|
||||||
const itemResult = await this.executeNode(nextNode, graph, itemContext, new Set(), trace, fanIn, kvStore);
|
const itemResult = await this.executeNode(nextNode, graph, itemContext, new Set(), trace, fanIn, kvStore);
|
||||||
iterResults.push(itemResult);
|
iterResults.push(itemResult);
|
||||||
}
|
}
|
||||||
@@ -497,7 +498,11 @@ export class GraphExecutor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** node.data 的 string 值支援 {{variable}} 替換,從 context 取值 */
|
/** node.data 的 string 值支援 {{variable}} 替換,從 context 取值
|
||||||
|
* 支援嵌套 path:{{item.content}} → ctx.item.content
|
||||||
|
* 支援 array index:{{paragraphs.0.entity}} → ctx.paragraphs[0].entity
|
||||||
|
* 非 string 值(object/array)會 JSON.stringify
|
||||||
|
*/
|
||||||
function interpolateData(
|
function interpolateData(
|
||||||
data: Record<string, unknown> | undefined,
|
data: Record<string, unknown> | undefined,
|
||||||
ctx: Record<string, unknown>,
|
ctx: Record<string, unknown>,
|
||||||
@@ -506,9 +511,11 @@ function interpolateData(
|
|||||||
const result: Record<string, unknown> = {};
|
const result: Record<string, unknown> = {};
|
||||||
for (const [k, v] of Object.entries(data)) {
|
for (const [k, v] of Object.entries(data)) {
|
||||||
if (typeof v === 'string') {
|
if (typeof v === 'string') {
|
||||||
result[k] = v.replace(/\{\{(\w+)\}\}/g, (_, key) => {
|
result[k] = v.replace(/\{\{([\w.]+)\}\}/g, (_, key: string) => {
|
||||||
const val = ctx[key];
|
const val = getNestedValue(ctx, key);
|
||||||
return val !== undefined ? String(val) : `{{${key}}}`;
|
if (val === undefined) return `{{${key}}}`;
|
||||||
|
if (typeof val === 'string') return val;
|
||||||
|
return JSON.stringify(val);
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
result[k] = v;
|
result[k] = v;
|
||||||
@@ -517,6 +524,18 @@ function interpolateData(
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** 從 ctx 用 dot path 取嵌套值:'a.b.0.c' → ctx.a.b[0].c */
|
||||||
|
function getNestedValue(ctx: unknown, path: string): unknown {
|
||||||
|
const parts = path.split('.');
|
||||||
|
let cur: unknown = ctx;
|
||||||
|
for (const p of parts) {
|
||||||
|
if (cur === null || cur === undefined) return undefined;
|
||||||
|
if (typeof cur !== 'object') return undefined;
|
||||||
|
cur = (cur as Record<string, unknown>)[p];
|
||||||
|
}
|
||||||
|
return cur;
|
||||||
|
}
|
||||||
|
|
||||||
/** 判斷節點執行結果是否為失敗:success === false 或含有 error key */
|
/** 判斷節點執行結果是否為失敗:success === false 或含有 error key */
|
||||||
function isFailure(result: unknown): boolean {
|
function isFailure(result: unknown): boolean {
|
||||||
if (!result || typeof result !== 'object') return false;
|
if (!result || typeof result !== 'object') return false;
|
||||||
|
|||||||
@@ -23,6 +23,9 @@ export interface PausedRunState {
|
|||||||
trace_so_far: TraceStep[];
|
trace_so_far: TraceStep[];
|
||||||
api_key?: string;
|
api_key?: string;
|
||||||
expires_at: number; // unix ms
|
expires_at: number; // unix ms
|
||||||
|
// resume 時用來 parse callback result 的 recipe output 規格(resumable + recipe 整合)
|
||||||
|
recipe_output_format?: 'text' | 'json';
|
||||||
|
recipe_output_required_fields?: string[];
|
||||||
}
|
}
|
||||||
|
|
||||||
const KEY_PREFIX = 'paused_run:';
|
const KEY_PREFIX = 'paused_run:';
|
||||||
@@ -79,3 +82,40 @@ export function isResumablePending(result: unknown): { task_id: string } | null
|
|||||||
if (typeof r.task_id !== 'string' || !r.task_id) return null;
|
if (typeof r.task_id !== 'string' || !r.task_id) return null;
|
||||||
return { task_id: r.task_id };
|
return { task_id: r.task_id };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Parse claude_api result with recipe output format.
|
||||||
|
* 同步路徑跟 resume 路徑都用同一個解析器,避免邏輯歪掉。
|
||||||
|
*
|
||||||
|
* 輸入:result(可能是 {data:{text:"..."}} 或 {text:"..."})
|
||||||
|
* 輸出:parsed object 或 fallback 結構
|
||||||
|
*/
|
||||||
|
export function parseRecipeOutput(
|
||||||
|
result: unknown,
|
||||||
|
format: 'text' | 'json' | undefined,
|
||||||
|
requiredFields: string[] | undefined,
|
||||||
|
): unknown {
|
||||||
|
if (format !== 'json' || !result || typeof result !== 'object') return result;
|
||||||
|
const r = result as Record<string, unknown>;
|
||||||
|
const text = (r.data as Record<string, unknown> | undefined)?.text ?? r.text;
|
||||||
|
if (typeof text !== 'string') return result;
|
||||||
|
|
||||||
|
// 剝除 ```json ... ``` markdown fence(Claude 常這樣包)
|
||||||
|
let jsonText = String(text).trim();
|
||||||
|
const fenceMatch = jsonText.match(/^```(?:json)?\s*\n([\s\S]*?)\n```$/);
|
||||||
|
if (fenceMatch) jsonText = fenceMatch[1].trim();
|
||||||
|
|
||||||
|
try {
|
||||||
|
const parsed = JSON.parse(jsonText);
|
||||||
|
if (requiredFields && parsed && typeof parsed === 'object') {
|
||||||
|
const missing = requiredFields.filter((f) => !(f in (parsed as Record<string, unknown>)));
|
||||||
|
if (missing.length > 0) {
|
||||||
|
return { success: false, error: `recipe output 缺欄位: ${missing.join(', ')}`, raw: parsed };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// 把 parsed 的欄位 spread 到 top-level,FOREACH / 下游 {{var}} 都好取
|
||||||
|
return { success: true, data: parsed, ...(parsed && typeof parsed === 'object' ? parsed as Record<string, unknown> : {}) };
|
||||||
|
} catch (e) {
|
||||||
|
return { success: false, error: `recipe output JSON parse 失敗: ${e instanceof Error ? e.message : String(e)}`, raw_text: text };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -57,6 +57,8 @@ resumeRouter.post('/workflows/resume', async (c) => {
|
|||||||
callback_result: callbackResult,
|
callback_result: callbackResult,
|
||||||
prior_trace: state.trace_so_far,
|
prior_trace: state.trace_so_far,
|
||||||
kvNamespace: c.env.EXEC_CONTEXT,
|
kvNamespace: c.env.EXEC_CONTEXT,
|
||||||
|
recipe_output_format: state.recipe_output_format,
|
||||||
|
recipe_output_required_fields: state.recipe_output_required_fields,
|
||||||
});
|
});
|
||||||
const duration_ms = Date.now() - start;
|
const duration_ms = Date.now() - start;
|
||||||
return c.json({
|
return c.json({
|
||||||
|
|||||||
Reference in New Issue
Block a user