diff --git a/cypher-executor/src/actions/graph-builder.ts b/cypher-executor/src/actions/graph-builder.ts index cd1eb84..3703b17 100644 --- a/cypher-executor/src/actions/graph-builder.ts +++ b/cypher-executor/src/actions/graph-builder.ts @@ -32,11 +32,25 @@ export function buildExecutionGraph( return { id, type: nr.type, componentId, label: name, data }; }); - const edges = parsed.edges.map(e => ({ - from: e.from.toLowerCase().replace(/\s+/g, '-'), - to: e.to.toLowerCase().replace(/\s+/g, '-'), - type: toEdgeType(e.label), - })); + const edges = parsed.edges.map(e => { + // 「對每個 X」label 抽 iterator:cypher binding 表達 FOREACH 的迭代變數 + // 例:'A >> 對每個 paragraph >> B' → type=FOREACH, iterator='paragraph' + // getIterableFromContext 會找 ctx.paragraphs(複數)或 ctx.paragraph + let iterator: string | undefined; + let label = e.label; + const foreachMatch = label.match(/^(?:對每個|FOREACH)\s+(\w+)$/i); + if (foreachMatch) { + iterator = foreachMatch[1]; + label = '對每個'; // 改回標準 label 走 SEMANTIC_EDGE_MAP + } + const edge: { from: string; to: string; type: ReturnType; iterator?: string } = { + from: e.from.toLowerCase().replace(/\s+/g, '-'), + to: e.to.toLowerCase().replace(/\s+/g, '-'), + type: toEdgeType(label), + }; + if (iterator) edge.iterator = iterator; + return edge; + }); return { id: graphId, name: graphName, nodes, edges }; } diff --git a/cypher-executor/src/graph-executor.ts b/cypher-executor/src/graph-executor.ts index fb66af6..d02c1f4 100644 --- a/cypher-executor/src/graph-executor.ts +++ b/cypher-executor/src/graph-executor.ts @@ -4,7 +4,7 @@ import { kvSetNodeOutput, kvGetNodeOutput, ExecutionError, WorkflowPaused } from import { injectCredentials } from './actions/credential-injector'; import { tryAuthDispatch } from './actions/auth-dispatcher'; import { expandPromptRecipe } from './lib/recipe-expander'; -import { persistPausedRun, isResumablePending } from './lib/paused-runs'; +import { persistPausedRun, isResumablePending, parseRecipeOutput } from './lib/paused-runs'; export type ComponentLoader = (componentId: string) => Promise; export type WorkflowLoader = (workflowId: string) => Promise; @@ -109,8 +109,19 @@ export class GraphExecutor { callback_result: Record; // daemon callback 給的 result(取代 paused result) prior_trace: TraceStep[]; kvNamespace?: KVNamespace; + recipe_output_format?: 'text' | 'json'; + recipe_output_required_fields?: string[]; }): Promise<{ data: unknown; trace: TraceStep[] }> { - const { graph, paused_node_id, paused_context, callback_result, prior_trace, kvNamespace } = args; + const { graph, paused_node_id, paused_context, prior_trace, kvNamespace } = args; + let { callback_result } = args; + + // Recipe output parsing:跟立刻回路徑同樣解析(spread parsed 欄位到 top-level) + // SDD: recipe-system + resumable-workflow + callback_result = parseRecipeOutput( + callback_result, + args.recipe_output_format, + args.recipe_output_required_fields, + ) as Record; this.currentGraph = graph; this.currentRunId = `${graph.id}-resume-${Date.now()}`; @@ -287,38 +298,21 @@ export class GraphExecutor { trace_so_far: trace, api_key: this.apiKey, expires_at: Date.now() + 24 * 60 * 60 * 1000, + recipe_output_format: mergedContext._recipe_output_format as 'text' | 'json' | undefined, + recipe_output_required_fields: mergedContext._recipe_output_required_fields as string[] | undefined, }); throw new WorkflowPaused(pending.task_id, this.currentRunId, node.id, trace); } - // Recipe output parsing:若 recipe 指定 format=json,把 claude_api 回傳的 text 自動 parse - // 失敗 → 包裝成 success:false,給下游清楚知道是 LLM output 解析問題 - if (mergedContext._recipe_output_format === 'json' && result && typeof result === 'object') { - const r = result as Record; - const text = (r.data as Record | undefined)?.text ?? r.text; - if (typeof text === 'string') { - // 剝除 ```json ... ``` markdown fence(Claude 常這樣包) - let jsonText = String(text).trim(); - const fenceMatch = jsonText.match(/^```(?:json)?\s*\n([\s\S]*?)\n```$/); - if (fenceMatch) jsonText = fenceMatch[1].trim(); - try { - const parsed = JSON.parse(jsonText); - const required = mergedContext._recipe_output_required_fields as string[] | undefined; - if (required && parsed && typeof parsed === 'object') { - const missing = required.filter((f) => !(f in (parsed as Record))); - if (missing.length > 0) { - result = { success: false, error: `recipe output 缺欄位: ${missing.join(', ')}`, raw: parsed }; - } else { - result = { success: true, data: parsed }; - } - } else { - result = { success: true, data: parsed }; - } - } catch (e) { - result = { success: false, error: `recipe output JSON parse 失敗: ${e instanceof Error ? e.message : String(e)}`, raw_text: text }; - } - } - } + // Recipe output parsing:用 parseRecipeOutput 統一處理(立刻回 + resume 長回兩條路共用) + // SDD: recipe-system + resumable-workflow + // 解完後 parsed JSON 的 top-level 欄位(如 paragraphs / triplets)spread 到 result, + // 讓下游 FOREACH 跟 {{var}} 模板直接可取 + result = parseRecipeOutput( + result, + mergedContext._recipe_output_format as 'text' | 'json' | undefined, + mergedContext._recipe_output_required_fields as string[] | undefined, + ); // BUILD-006:將節點 output 存入 KV(key = {run_id}:node:{node_id}) // 這讓下游節點可以透過 KV 讀取上游的具名 output,解決同名欄位衝突 @@ -439,8 +433,15 @@ export class GraphExecutor { const items = getIterableFromContext(result, iteratorKey); const iterResults: unknown[] = []; + // FOREACH itemContext 順序:原 ctx 全局欄位(api_key 等)優先 < result(上游輸出)< item(當前迭代) + // 之前只 spread result,全局 api_key 會丟,下游 {{api_key}} 抓不到 + const baseCtx = (typeof context === 'object' && context !== null) ? context as Record : {}; for (const item of items) { - const itemContext = { ...(result as Record), [iteratorKey]: item }; + const itemContext = { + ...baseCtx, + ...(result as Record), + [iteratorKey]: item, + }; const itemResult = await this.executeNode(nextNode, graph, itemContext, new Set(), trace, fanIn, kvStore); iterResults.push(itemResult); } @@ -497,7 +498,11 @@ export class GraphExecutor { } } -/** node.data 的 string 值支援 {{variable}} 替換,從 context 取值 */ +/** node.data 的 string 值支援 {{variable}} 替換,從 context 取值 + * 支援嵌套 path:{{item.content}} → ctx.item.content + * 支援 array index:{{paragraphs.0.entity}} → ctx.paragraphs[0].entity + * 非 string 值(object/array)會 JSON.stringify + */ function interpolateData( data: Record | undefined, ctx: Record, @@ -506,9 +511,11 @@ function interpolateData( const result: Record = {}; for (const [k, v] of Object.entries(data)) { if (typeof v === 'string') { - result[k] = v.replace(/\{\{(\w+)\}\}/g, (_, key) => { - const val = ctx[key]; - return val !== undefined ? String(val) : `{{${key}}}`; + result[k] = v.replace(/\{\{([\w.]+)\}\}/g, (_, key: string) => { + const val = getNestedValue(ctx, key); + if (val === undefined) return `{{${key}}}`; + if (typeof val === 'string') return val; + return JSON.stringify(val); }); } else { result[k] = v; @@ -517,6 +524,18 @@ function interpolateData( return result; } +/** 從 ctx 用 dot path 取嵌套值:'a.b.0.c' → ctx.a.b[0].c */ +function getNestedValue(ctx: unknown, path: string): unknown { + const parts = path.split('.'); + let cur: unknown = ctx; + for (const p of parts) { + if (cur === null || cur === undefined) return undefined; + if (typeof cur !== 'object') return undefined; + cur = (cur as Record)[p]; + } + return cur; +} + /** 判斷節點執行結果是否為失敗:success === false 或含有 error key */ function isFailure(result: unknown): boolean { if (!result || typeof result !== 'object') return false; diff --git a/cypher-executor/src/lib/paused-runs.ts b/cypher-executor/src/lib/paused-runs.ts index 0c27957..4871b93 100644 --- a/cypher-executor/src/lib/paused-runs.ts +++ b/cypher-executor/src/lib/paused-runs.ts @@ -23,6 +23,9 @@ export interface PausedRunState { trace_so_far: TraceStep[]; api_key?: string; expires_at: number; // unix ms + // resume 時用來 parse callback result 的 recipe output 規格(resumable + recipe 整合) + recipe_output_format?: 'text' | 'json'; + recipe_output_required_fields?: string[]; } const KEY_PREFIX = 'paused_run:'; @@ -79,3 +82,40 @@ export function isResumablePending(result: unknown): { task_id: string } | null if (typeof r.task_id !== 'string' || !r.task_id) return null; return { task_id: r.task_id }; } + +/** + * Parse claude_api result with recipe output format. + * 同步路徑跟 resume 路徑都用同一個解析器,避免邏輯歪掉。 + * + * 輸入:result(可能是 {data:{text:"..."}} 或 {text:"..."}) + * 輸出:parsed object 或 fallback 結構 + */ +export function parseRecipeOutput( + result: unknown, + format: 'text' | 'json' | undefined, + requiredFields: string[] | undefined, +): unknown { + if (format !== 'json' || !result || typeof result !== 'object') return result; + const r = result as Record; + const text = (r.data as Record | undefined)?.text ?? r.text; + if (typeof text !== 'string') return result; + + // 剝除 ```json ... ``` markdown fence(Claude 常這樣包) + let jsonText = String(text).trim(); + const fenceMatch = jsonText.match(/^```(?:json)?\s*\n([\s\S]*?)\n```$/); + if (fenceMatch) jsonText = fenceMatch[1].trim(); + + try { + const parsed = JSON.parse(jsonText); + if (requiredFields && parsed && typeof parsed === 'object') { + const missing = requiredFields.filter((f) => !(f in (parsed as Record))); + if (missing.length > 0) { + return { success: false, error: `recipe output 缺欄位: ${missing.join(', ')}`, raw: parsed }; + } + } + // 把 parsed 的欄位 spread 到 top-level,FOREACH / 下游 {{var}} 都好取 + return { success: true, data: parsed, ...(parsed && typeof parsed === 'object' ? parsed as Record : {}) }; + } catch (e) { + return { success: false, error: `recipe output JSON parse 失敗: ${e instanceof Error ? e.message : String(e)}`, raw_text: text }; + } +} diff --git a/cypher-executor/src/routes/resume.ts b/cypher-executor/src/routes/resume.ts index a62efe7..b225882 100644 --- a/cypher-executor/src/routes/resume.ts +++ b/cypher-executor/src/routes/resume.ts @@ -57,6 +57,8 @@ resumeRouter.post('/workflows/resume', async (c) => { callback_result: callbackResult, prior_trace: state.trace_so_far, kvNamespace: c.env.EXEC_CONTEXT, + recipe_output_format: state.recipe_output_format, + recipe_output_required_fields: state.recipe_output_required_fields, }); const duration_ms = Date.now() - start; return c.json({