feat(arcrun): recipe system + resumable workflow + component registry canon

Three new platform capabilities + one component (kbdb_get) to enable
real AI workflow execution through cypher binding YAML.

## Recipe System (容器 + Recipe 模式)
SDD: .agents/specs/recipe-system/

- prompt_recipe schema (Zod): fragments + inputs + assembly + output
- recipe-expander.ts: expand recipe ref → real prompt by fetching KBDB blocks
  + pulling context fields with transforms (pluck_content / extract_field / etc)
- 7 transform whitelist: json_array / to_string / join / markdown_list /
  extract_field / first / pluck_content
- graph-executor hooks: detect node.data.recipe → expand → inject into ctx
- output JSON parser (with markdown fence stripping for Claude-wrapped JSON)
- Stored in RECIPES KV under prompt_recipe:{name}

## Resumable Workflow (webhook callback resume)
SDD: .agents/specs/resumable-workflow/

- WorkflowPaused class + paused-runs.ts (persist/load/consume in EXEC_CONTEXT KV, 24h TTL)
- graph-executor: detect {pending:true, task_id} → persist state → throw WorkflowPaused
- cypher-handlers: catch → return {success:true, paused:true, task_id, run_id}
- POST /workflows/resume route: consume KV state → resumeFromPaused()
- Auto-inject callback_url for claude_api nodes (PUBLIC_BASE_URL or default cypher.arcrun.dev)
- claude_api/main.go: forward callback_url to Mira daemon, default timeout 25s→120s
- Idempotent (consume = load+delete)

## Component Registry Canon
SDD: .agents/specs/component-registry-canon/

- Add POST /components/index-only endpoint (metadata-only, no wasm/sandbox)
- Backfill script (mjs): scan registry/components/*/contract.yaml → submit to KV
- register-component.sh: SSOT for local + CI hook (deploy.yml change in next commit)
- Drop R2 dead storage from submitComponent + types + wrangler
- Schema relaxed: category enum + auth/ai/platform; cold_start 50→500ms; size 2→8MB

## kbdb_get component
- registry/components/kbdb_get/: TinyGo WASM, two modes (block_id / page_name list)
- .component-builds/kbdb_get/: WASI shim worker (kbdb-get.arcrun.dev)

End-to-end validation: AI uses MCP execute_workflow with recipe ref →
cypher-executor expands prompt from KBDB schema/skill blocks + drafts →
claude_api calls Mira daemon → daemon callback fires resume route →
workflow continues. Verified with real 2KB+ Karpathy LLM Wiki draft.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-07 15:52:19 +08:00
parent e2221161a8
commit 497f92a268
32 changed files with 3562 additions and 36 deletions
+30 -2
View File
@@ -1,5 +1,5 @@
import type { Bindings, ExecutionGraph } from '../types';
import { ExecutionError } from '../types';
import { ExecutionError, WorkflowPaused } from '../types';
import { GraphExecutor } from '../graph-executor';
import { graphSchema } from '../lib/schemas';
import { createComponentLoader } from '../lib/component-loader';
@@ -32,7 +32,19 @@ export async function handleCypherExecute(
env: Bindings,
waitUntil: (promise: Promise<void>) => void,
apiKey?: string,
): Promise<{ success: boolean; data?: unknown; error?: string; trace?: unknown; duration_ms: number; graph?: ExecutionGraph }> {
): Promise<{
success: boolean;
data?: unknown;
error?: string;
trace?: unknown;
duration_ms: number;
graph?: ExecutionGraph;
// resumable workflow: 節點 pending 時回 paused(不算 success 也不算 fail
paused?: boolean;
task_id?: string;
run_id?: string;
paused_node_id?: string;
}> {
const parsed = parseTriplets(triplets as unknown[]);
if (!parsed) {
throw new Error('無法解析任何節點');
@@ -70,6 +82,22 @@ export async function handleCypherExecute(
return { success: true, data: result.data, trace: result.trace, duration_ms, graph };
} catch (err) {
const duration_ms = Date.now() - start;
// Resumable workflow: 節點回 pending → 回 paused 結構,不算成功也不算失敗
// SDD: resumable-workflow/design.md
if (err instanceof WorkflowPaused) {
return {
success: true,
paused: true,
task_id: err.task_id,
run_id: err.run_id,
paused_node_id: err.paused_node_id,
trace: err.trace_so_far,
duration_ms,
graph,
};
}
const errMsg = err instanceof Error ? err.message : String(err);
const componentId = graph.nodes.find(n => n.componentId)?.componentId ?? graphId;
const runId = `${graphId}-${Date.now()}`;
+188 -1
View File
@@ -1,8 +1,10 @@
// arcrun 圖遍歷引擎 — 支援完整 Cypher 語意關係
import type { ExecutionGraph, GraphNode, TraceStep, ComponentRunner, KVContextStore, EdgeType, Bindings } from './types';
import { kvSetNodeOutput, kvGetNodeOutput, ExecutionError } from './types';
import { kvSetNodeOutput, kvGetNodeOutput, ExecutionError, WorkflowPaused } from './types';
import { injectCredentials } from './actions/credential-injector';
import { tryAuthDispatch } from './actions/auth-dispatcher';
import { expandPromptRecipe } from './lib/recipe-expander';
import { persistPausedRun, isResumablePending } from './lib/paused-runs';
export type ComponentLoader = (componentId: string) => Promise<ComponentRunner>;
export type WorkflowLoader = (workflowId: string) => Promise<ExecutionGraph>;
@@ -17,6 +19,11 @@ export class GraphExecutor {
private apiKey?: string;
public recordComponentReference?: (componentId: string, workflowId: string) => Promise<void>;
// resumable workflowSDD: resumable-workflow/design.md
// 暫停時持久化 state 用,需在 execute 進入時設定
private currentGraph?: ExecutionGraph;
private currentRunId?: string;
constructor(loader: ComponentLoader, workflowLoader?: WorkflowLoader, env?: Bindings, apiKey?: string) {
this.loader = loader;
this.workflowLoader = workflowLoader;
@@ -40,6 +47,10 @@ export class GraphExecutor {
? { runId: `${graph.id}-${Date.now()}`, kv: kvNamespace }
: undefined;
// resumable workflow:記住當前 graph + run_id 給 pending 暫停用
this.currentGraph = graph;
this.currentRunId = kvStore?.runId ?? `${graph.id}-${Date.now()}`;
// 找出所有起點(沒有任何邊指向的節點)
const hasIncoming = new Set(graph.edges.map(e => e.to));
const startNodes = graph.nodes.filter(n => !hasIncoming.has(n.id));
@@ -82,6 +93,92 @@ export class GraphExecutor {
return { data: mergedResult, trace };
}
/**
* 從 paused state 繼續執行 workflow
* SDD: resumable-workflow/design.md §3.2
*
* 流程:
* 1. 把 paused_node 當已執行(result = callbackResult,注入進 context
* 2. 找出 paused_node 的所有下游節點當新起點
* 3. 執行下游節點直到結束(或再次 paused)
*/
async resumeFromPaused(args: {
graph: ExecutionGraph;
paused_node_id: string;
paused_context: Record<string, unknown>; // paused 當下的 context
callback_result: Record<string, unknown>; // daemon callback 給的 result(取代 paused result
prior_trace: TraceStep[];
kvNamespace?: KVNamespace;
}): Promise<{ data: unknown; trace: TraceStep[] }> {
const { graph, paused_node_id, paused_context, callback_result, prior_trace, kvNamespace } = args;
this.currentGraph = graph;
this.currentRunId = `${graph.id}-resume-${Date.now()}`;
const trace: TraceStep[] = [...prior_trace];
const kvStore: KVContextStore | undefined = kvNamespace
? { runId: this.currentRunId, kv: kvNamespace }
: undefined;
// 把 callback_result 寫進 paused_node 的 KV output(讓下游讀得到)
if (kvStore) {
await kvSetNodeOutput(kvStore, paused_node_id, callback_result);
}
// 把 callback_result spread 進 context(替代 paused 結果)
const mergedContext: Record<string, unknown> = {
...paused_context,
...(callback_result && typeof callback_result === 'object' ? callback_result : {}),
};
if (kvStore) {
if (!mergedContext._kv_outputs) mergedContext._kv_outputs = {};
(mergedContext._kv_outputs as Record<string, unknown>)[paused_node_id] = callback_result;
}
// 找下游節點
const downstreamEdges = graph.edges.filter(e => e.from === paused_node_id);
if (downstreamEdges.length === 0) {
// paused_node 是最後一個節點 → 直接結束
return { data: callback_result, trace };
}
// 重建 fanIn(針對下游可能 fan-in 的節點)
const fanIn: FanInState = new Map();
for (const node of graph.nodes) {
const inDeg = graph.edges.filter(e => e.to === node.id).length;
if (inDeg > 1) {
fanIn.set(node.id, { ctx: { ...mergedContext }, remaining: inDeg });
}
}
// 對每個下游節點,建立新 visited Set 避免 paused_node 自己被再跑一次
const visited = new Set<string>([`${paused_node_id}:${JSON.stringify(paused_context).slice(0, 50)}`]);
const downstreamNodes = downstreamEdges
.map(e => graph.nodes.find(n => n.id === e.to))
.filter((n): n is GraphNode => !!n);
const results = await Promise.all(
downstreamNodes.map(node =>
this.executeNode(node, graph, mergedContext, visited, trace, fanIn, kvStore)
)
);
let mergedResult: unknown;
if (results.length === 1) {
mergedResult = results[0];
} else {
mergedResult = results.reduce(
(acc: Record<string, unknown>, r: unknown) => ({
...acc,
...(typeof r === 'object' && r !== null ? (r as Record<string, unknown>) : {}),
}),
{} as Record<string, unknown>,
);
}
return { data: mergedResult, trace };
}
private async executeNode(
node: GraphNode,
graph: ExecutionGraph,
@@ -118,6 +215,37 @@ export class GraphExecutor {
...resolvedData,
};
// Resumable workflow callback_url 注入(SDD: resumable-workflow/design.md §2.2
// claude_api 容器拿到後會透傳給 Mira daemondaemon task 完成時 POST 進來
// hostname 暫從 PUBLIC_BASE_URL 取,沒設則用 cypher.arcrun.dev 預設
if (node.componentId === 'claude_api') {
const baseUrl = (this.env as { PUBLIC_BASE_URL?: string } | undefined)?.PUBLIC_BASE_URL
?? 'https://cypher.arcrun.dev';
mergedContext.callback_url = `${baseUrl.replace(/\/$/, '')}/workflows/resume`;
}
// Recipe expansion:若 node.data.recipe 存在,展開成實際 prompt 並併進 mergedContext
// SDD: matrix/arcrun/.agents/specs/recipe-system/design.md §2.2
if (typeof resolvedData.recipe === 'string' && this.env?.RECIPES) {
try {
const expanded = await expandPromptRecipe(
resolvedData.recipe,
ctx,
this.env as { RECIPES: { get: (k: string) => Promise<string | null> }; KBDB_BASE_URL?: string },
this.apiKey ?? '',
);
mergedContext = {
...mergedContext,
prompt: expanded.prompt,
model: expanded.model,
_recipe_output_format: expanded.output_format,
_recipe_output_required_fields: expanded.output_required_fields,
};
} catch (e) {
throw new Error(`recipe 展開失敗 (${resolvedData.recipe}): ${e instanceof Error ? e.message : String(e)}`);
}
}
// Credential 注入:在 WASM 執行前自動注入 credentials_required 中宣告的 token
if (this.env) {
// 先試 auth dispatcher(新路徑,走 auth primitive WASM Worker via HTTP
@@ -137,6 +265,61 @@ export class GraphExecutor {
nodeInput = mergedContext;
result = await runner(mergedContext);
// Resumable workflow:偵測 pending,持久化 paused state 後 throw WorkflowPaused
// SDD: resumable-workflow/design.md §3.2.1
// 注意:放在 recipe output parsing 之前 — pending 結果不該被當 JSON 解析
const pending = isResumablePending(result);
if (pending && this.env?.EXEC_CONTEXT && this.currentGraph && this.currentRunId) {
// 把這個節點的執行紀錄寫進 tracestatus=paused
trace.push({
nodeId: node.id,
type: node.type,
input: nodeInput,
output: result,
duration_ms: Date.now() - start,
});
await persistPausedRun(this.env.EXEC_CONTEXT, pending.task_id, {
run_id: this.currentRunId,
graph: this.currentGraph,
paused_node_id: node.id,
paused_context: context as Record<string, unknown>,
paused_pending_result: result as Record<string, unknown>,
trace_so_far: trace,
api_key: this.apiKey,
expires_at: Date.now() + 24 * 60 * 60 * 1000,
});
throw new WorkflowPaused(pending.task_id, this.currentRunId, node.id, trace);
}
// Recipe output parsing:若 recipe 指定 format=json,把 claude_api 回傳的 text 自動 parse
// 失敗 → 包裝成 success:false,給下游清楚知道是 LLM output 解析問題
if (mergedContext._recipe_output_format === 'json' && result && typeof result === 'object') {
const r = result as Record<string, unknown>;
const text = (r.data as Record<string, unknown> | undefined)?.text ?? r.text;
if (typeof text === 'string') {
// 剝除 ```json ... ``` markdown fenceClaude 常這樣包)
let jsonText = String(text).trim();
const fenceMatch = jsonText.match(/^```(?:json)?\s*\n([\s\S]*?)\n```$/);
if (fenceMatch) jsonText = fenceMatch[1].trim();
try {
const parsed = JSON.parse(jsonText);
const required = mergedContext._recipe_output_required_fields as string[] | undefined;
if (required && parsed && typeof parsed === 'object') {
const missing = required.filter((f) => !(f in (parsed as Record<string, unknown>)));
if (missing.length > 0) {
result = { success: false, error: `recipe output 缺欄位: ${missing.join(', ')}`, raw: parsed };
} else {
result = { success: true, data: parsed };
}
} else {
result = { success: true, data: parsed };
}
} catch (e) {
result = { success: false, error: `recipe output JSON parse 失敗: ${e instanceof Error ? e.message : String(e)}`, raw_text: text };
}
}
}
// BUILD-006:將節點 output 存入 KVkey = {run_id}:node:{node_id}
// 這讓下游節點可以透過 KV 讀取上游的具名 output,解決同名欄位衝突
if (kvStore && result !== null && result !== undefined) {
@@ -158,6 +341,10 @@ export class GraphExecutor {
break;
}
} catch (e: any) {
// WorkflowPaused 不是錯誤,是「workflow 暫停」訊號,直接往上傳
// SDD: resumable-workflow/design.md
if (e instanceof WorkflowPaused) throw e;
const errMsg = e.message || String(e);
trace.push({
nodeId: node.id,
+9 -2
View File
@@ -15,11 +15,17 @@ import { recipesRouter } from './routes/recipes';
import { credentialsRouter } from './routes/credentials';
import { webhooksNamedRouter } from './routes/webhooks-named';
import { authRouter } from './routes/auth';
import { resumeRouter } from './routes/resume';
const app = new Hono<{ Bindings: Bindings }>();
// 全域 CORS
app.use('*', cors());
// 全域 CORS(允許 arcrun.dev landing page 帶 credentials 存取)
app.use('*', cors({
origin: ['https://arcrun.dev', 'https://www.arcrun.dev'],
allowMethods: ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS'],
allowHeaders: ['Content-Type', 'Authorization', 'X-Arcrun-API-Key'],
credentials: true,
}));
// 掛載所有路由器
app.route('/', docsRouter);
@@ -35,6 +41,7 @@ app.route('/', registerRouter);
app.route('/', recipesRouter);
app.route('/', credentialsRouter);
app.route('/', authRouter);
app.route('/', resumeRouter);
// Worker 導出
export default app;
@@ -31,6 +31,9 @@ import type { Bindings, ComponentRunner, ServiceBinding } from '../types';
*
* R2 動態注入 WASM 路徑作廢(CF workerd 不支援以 R2 物件臨時 instantiate)。
*/
// TODO(架構債,2026-05-07):白名單寫死違反 arcrun 「新零件無需改 cypher-executor」承諾
// 應改為從 component-registry KV 動態查(registry 已有 backfill index,知道所有 canonical_id
// SDD 待開:cypher-executor-dynamic-component-discovery
const WASM_HTTP_RUNNER_IDS: ReadonlySet<string> = new Set([
// API 零件(對應 registry/components/ 下的 TinyGo WASM
'http_request',
@@ -44,6 +47,12 @@ const WASM_HTTP_RUNNER_IDS: ReadonlySet<string> = new Set([
'auth_service_account',
'auth_oauth2',
'auth_mtls',
// Mira 零件(2026-05-07 加,吃狗糧推 7-B 時撞到白名單擋)
'claude_api',
'kbdb_ingest',
'kbdb_get',
'kbdb_create_block',
'kbdb_patch_block',
]);
/** canonical_id → 獨立 Worker URL(慣例:snake_case → kebab-case + .arcrun.dev */
+81
View File
@@ -0,0 +1,81 @@
/**
* Paused workflow runs:節點回 pending 時把 run state 持久化進 KV
* webhook callback 進來時撿回繼續執行
*
* SDD: matrix/arcrun/.agents/specs/resumable-workflow/design.md §2.1
*
* KV key: paused_run:{task_id}
* TTL: 24h(避免 KV 累積,超過就 GC)
*
* 設計筆記:
* - 用 task_id 當 keydaemon 派的 unique id),不用 run_id(同 run 可能多 paused 節點 v2
* - consume = load + delete 原子操作(避免重複 callback 重複執行)
*/
import type { ExecutionGraph, TraceStep } from '../types';
export interface PausedRunState {
run_id: string;
graph: ExecutionGraph;
paused_node_id: string;
paused_context: Record<string, unknown>;
paused_pending_result: Record<string, unknown>; // 節點回的 {pending, task_id, ...}
trace_so_far: TraceStep[];
api_key?: string;
expires_at: number; // unix ms
}
const KEY_PREFIX = 'paused_run:';
const TTL_SECONDS = 24 * 60 * 60;
type KvBinding = {
get: (key: string) => Promise<string | null>;
put: (key: string, value: string, options?: { expirationTtl?: number }) => Promise<void>;
delete: (key: string) => Promise<void>;
};
export async function persistPausedRun(
kv: KvBinding,
taskId: string,
state: PausedRunState,
): Promise<void> {
await kv.put(`${KEY_PREFIX}${taskId}`, JSON.stringify(state), { expirationTtl: TTL_SECONDS });
}
export async function loadPausedRun(
kv: KvBinding,
taskId: string,
): Promise<PausedRunState | null> {
const raw = await kv.get(`${KEY_PREFIX}${taskId}`);
if (!raw) return null;
try {
return JSON.parse(raw) as PausedRunState;
} catch {
return null;
}
}
/**
* 原子讀+刪:避免同 task_id 重複 callback 重複執行下游
* (CF KV 沒真原子操作,但 delete 失敗不影響 load 已成功)
*/
export async function consumePausedRun(
kv: KvBinding,
taskId: string,
): Promise<PausedRunState | null> {
const state = await loadPausedRun(kv, taskId);
if (!state) return null;
await kv.delete(`${KEY_PREFIX}${taskId}`).catch(() => {
// delete 失敗不擋,最多就重複執行一次(接受)
});
return state;
}
/** 偵測 component result 是否為「需要 resume」的 pending pattern */
export function isResumablePending(result: unknown): { task_id: string } | null {
if (!result || typeof result !== 'object') return null;
const r = result as Record<string, unknown>;
if (r.pending !== true) return null;
if (typeof r.task_id !== 'string' || !r.task_id) return null;
return { task_id: r.task_id };
}
@@ -0,0 +1,90 @@
/**
* prompt_recipe Zod schema
* SDD: matrix/arcrun/.agents/specs/recipe-system/design.md §2.1
*
* 平行於既有 auth_recipe / api_recipe,存 RECIPES KV (key: `prompt_recipe:{name}`)
* 容器 + recipe 模式:claude_api 是容器,recipe 是配方
*/
import { z } from 'zod';
// ── Transform 白名單 ──────────────────────────────────────────────────────────
// 限制 transform 種類避免變 mini-DSL;超過範圍請寫零件
export const TRANSFORM_NAMES = [
'json_array', // array → JSON.stringify 整體
'to_string', // 任意值 → String(x)
'join', // array → join(sep)sep 預設換行
'markdown_list', // array → "- a\n- b\n- c"
'extract_field', // array of object → 抽 field 後的 array(再可串其他 transform
'first', // array → first element(取單一)
'pluck_content', // KBDB blocks array → 抽 content 後 join 雙換行(草稿合併常用)
] as const;
/** transform 表示法:name 或 name:arg(如 extract_field:page_name */
export const TransformSchema = z.string().regex(/^[a-z_]+(:.+)?$/, 'transform 必須為 name 或 name:arg 格式');
// ── Fragment:從 KBDB / KV 抓固定資料 ──────────────────────────────────────────
export const KBDBBlockFragmentSchema = z.object({
var: z.string().min(1), // prompt template 內的變數名
source: z.literal('kbdb_block'),
block_id: z.string().optional(), // 二擇一
block_page_name: z.string().optional(), // 比 block_id 穩定
field: z.string().default('content'), // 抓 block 的哪個欄位
});
export const KVFragmentSchema = z.object({
var: z.string().min(1),
source: z.literal('kv'),
key: z.string().min(1),
});
// discriminatedUnion 對 refined zod object 不支援,故拆成驗證後 + 單獨檢查 block_id|page_name
export const FragmentSchema = z.discriminatedUnion('source', [
KBDBBlockFragmentSchema,
KVFragmentSchema,
]).superRefine((d, ctx) => {
if (d.source === 'kbdb_block' && !d.block_id && !d.block_page_name) {
ctx.addIssue({
code: z.ZodIssueCode.custom,
message: 'block_id 或 block_page_name 必填其一',
});
}
});
// ── Input:從 workflow context 取值(含 transform) ────────────────────────────
export const InputSchema = z.object({
var: z.string().min(1),
from: z.string().min(1), // JSONPath-lite,如 "ctx.read_drafts.blocks"
transform: TransformSchema.optional(),
default: z.unknown().optional(), // from 取不到時的預設值(避免炸 prompt)
});
// ── Prompt 組裝 ──────────────────────────────────────────────────────────────
export const PromptAssemblySchema = z.object({
system: z.string().min(1), // 模板,可含 {{var}}
user: z.string().min(1),
});
// ── 輸出規格 ──────────────────────────────────────────────────────────────────
export const OutputSpecSchema = z.object({
format: z.enum(['text', 'json']).default('text'),
// 若 format=json,可選 schema 做 parse 後驗證(簡化版,列必填欄位即可)
required_fields: z.array(z.string()).optional(),
});
// ── 完整 prompt_recipe 定義 ────────────────────────────────────────────────────
export const PromptRecipeSchema = z.object({
kind: z.literal('prompt_recipe'),
name: z.string().min(1).regex(/^[a-z][a-z0-9_]*$/, 'name 為 lowercase + underscore'),
version: z.number().int().positive().default(1),
description: z.string().optional(),
model: z.enum(['haiku', 'sonnet', 'opus']).default('sonnet'),
fragments: z.array(FragmentSchema).default([]),
inputs: z.array(InputSchema).default([]),
prompt_assembly: PromptAssemblySchema,
output: OutputSpecSchema.default({ format: 'text' }),
});
export type PromptRecipe = z.infer<typeof PromptRecipeSchema>;
export type Fragment = z.infer<typeof FragmentSchema>;
export type RecipeInput = z.infer<typeof InputSchema>;
+136
View File
@@ -0,0 +1,136 @@
/**
* Recipe expander:把 prompt_recipe 展開成 claude_api 的實際 input
* SDD: matrix/arcrun/.agents/specs/recipe-system/design.md §2.2 + Phase 2.1
*
* 流程:
* 1. loadPromptRecipe 取定義
* 2. fragments → 用 KBDB API 抓 block content
* 3. inputs → 從 workflow context 取值 + 跑 transform
* 4. 套進 prompt_assembly.system / .user 的 {{var}} 模板
* 5. 回傳 { prompt, model, output_format, output_required_fields }
*/
import { loadPromptRecipe, RecipeLoadError } from './recipe-loader';
import { applyTransform } from './recipe-transforms';
import type { Fragment, RecipeInput } from './prompt-recipe-schema';
type ExpanderEnv = {
RECIPES: { get: (key: string) => Promise<string | null> };
KBDB_BASE_URL?: string;
};
export interface ExpandedRecipe {
prompt: string; // user promptsystem + user 用 \n\n--- system ---\n 分隔)
model: 'haiku' | 'sonnet' | 'opus';
output_format: 'text' | 'json';
output_required_fields?: string[];
}
/** 從 path 取嵌套值,例如 "ctx.read_drafts.blocks" / "loop.item" */
function getByPath(ctx: Record<string, unknown>, path: string): unknown {
const parts = path.split('.');
let cur: unknown = ctx;
for (const p of parts) {
if (cur === null || cur === undefined) return undefined;
if (typeof cur !== 'object') return undefined;
cur = (cur as Record<string, unknown>)[p];
}
return cur;
}
/** {{var}} 模板替換(top-level vars 物件) */
function interpolate(template: string, vars: Record<string, string>): string {
return template.replace(/\{\{(\w+)\}\}/g, (_, key) => (vars[key] !== undefined ? vars[key] : `{{${key}}}`));
}
async function fetchKbdbBlock(
env: ExpanderEnv,
apiKey: string,
fragment: Extract<Fragment, { source: 'kbdb_block' }>,
): Promise<unknown> {
const base = (env.KBDB_BASE_URL ?? 'https://kbdb.finally.click').replace(/\/$/, '');
let url: string;
if (fragment.block_id) {
url = `${base}/blocks/${encodeURIComponent(fragment.block_id)}`;
} else {
url = `${base}/blocks?page_name=${encodeURIComponent(fragment.block_page_name!)}&limit=1`;
}
const res = await fetch(url, { headers: { Authorization: `Bearer ${apiKey}` } });
if (!res.ok) throw new Error(`KBDB fragment 抓取失敗 (${res.status}): ${url}`);
const data = (await res.json()) as Record<string, unknown>;
// page_name 模式回 {blocks:[]}block_id 模式直接回 block 物件
const block: Record<string, unknown> = fragment.block_id
? data
: ((data.blocks as unknown[])?.[0] as Record<string, unknown>) ?? {};
if (!block) throw new Error(`KBDB block 不存在: ${fragment.block_id ?? fragment.block_page_name}`);
const fieldVal = block[fragment.field];
if (fieldVal === undefined) throw new Error(`block 缺欄位 "${fragment.field}"`);
return fieldVal;
}
async function resolveFragment(
env: ExpanderEnv,
apiKey: string,
frag: Fragment,
): Promise<{ var: string; value: unknown }> {
if (frag.source === 'kv') {
const val = await env.RECIPES.get(frag.key);
if (val === null) throw new Error(`KV 找不到 key: ${frag.key}`);
return { var: frag.var, value: val };
}
return { var: frag.var, value: await fetchKbdbBlock(env, apiKey, frag) };
}
function resolveInput(input: RecipeInput, ctx: Record<string, unknown>): { var: string; value: unknown } {
let val = getByPath(ctx, input.from);
const beforeDefault = val;
if (val === undefined) val = input.default;
try {
if (input.transform) val = applyTransform(val, input.transform);
return { var: input.var, value: val };
} catch (e) {
// 把 path 跟原值放進錯誤訊息,方便 debug recipe
const valType = Array.isArray(beforeDefault) ? `array(${beforeDefault.length})`
: beforeDefault === undefined ? 'undefined(default applied)'
: typeof beforeDefault;
throw new Error(`${e instanceof Error ? e.message : String(e)} [path=${input.from}, type=${valType}]`);
}
}
/** 主入口:展開 recipe → 組 prompt */
export async function expandPromptRecipe(
recipeRef: string,
ctx: Record<string, unknown>,
env: ExpanderEnv,
apiKey: string, // KBDB partner key(從 workflow auth 來)
): Promise<ExpandedRecipe> {
const recipe = await loadPromptRecipe(recipeRef, env.RECIPES);
const vars: Record<string, string> = {};
for (const frag of recipe.fragments) {
const { var: name, value } = await resolveFragment(env, apiKey, frag);
vars[name] = typeof value === 'string' ? value : JSON.stringify(value);
}
for (const inp of recipe.inputs) {
const { var: name, value } = resolveInput(inp, ctx);
vars[name] = typeof value === 'string' ? value : JSON.stringify(value);
}
const system = interpolate(recipe.prompt_assembly.system, vars);
const user = interpolate(recipe.prompt_assembly.user, vars);
// claude_api 容器目前吃單一 prompt 字串 → system + user 用分隔線拼
const prompt = `${system}\n\n--- USER ---\n\n${user}`;
return {
prompt,
model: recipe.model,
output_format: recipe.output.format,
output_required_fields: recipe.output.required_fields,
};
}
export { RecipeLoadError };
+50
View File
@@ -0,0 +1,50 @@
/**
* Prompt recipe loader:從 RECIPES KV 抓 prompt_recipe 定義並驗證
* SDD: matrix/arcrun/.agents/specs/recipe-system/design.md Phase 1.3
*
* KV key 格式:prompt_recipe:{name}
* KV valueJSON 字串(不用 YAML,避免引入 yaml parser 進 worker
*/
import { PromptRecipeSchema, type PromptRecipe } from './prompt-recipe-schema';
type KvBinding = { get: (key: string) => Promise<string | null> };
export class RecipeLoadError extends Error {
constructor(message: string, public readonly recipe: string) {
super(message);
}
}
/** 從 RECIPES KV 抓 + parse + validate */
export async function loadPromptRecipe(
recipeRef: string, // 完整 key 如 "prompt_recipe:wiki_synthesis",或裸名 "wiki_synthesis"
recipesKv: KvBinding,
): Promise<PromptRecipe> {
const key = recipeRef.startsWith('prompt_recipe:')
? recipeRef
: `prompt_recipe:${recipeRef}`;
const raw = await recipesKv.get(key);
if (!raw) {
throw new RecipeLoadError(`找不到 recipe: ${key}`, key);
}
let parsed: unknown;
try {
parsed = JSON.parse(raw);
} catch (e) {
throw new RecipeLoadError(
`recipe ${key} 不是合法 JSON: ${e instanceof Error ? e.message : String(e)}`,
key,
);
}
const result = PromptRecipeSchema.safeParse(parsed);
if (!result.success) {
const issues = result.error.issues.map((i) => `${i.path.join('.')}: ${i.message}`).join('; ');
throw new RecipeLoadError(`recipe ${key} schema 驗證失敗: ${issues}`, key);
}
return result.data;
}
@@ -0,0 +1,58 @@
/**
* Recipe transform 白名單實作
* SDD: matrix/arcrun/.agents/specs/recipe-system/design.md §2.1
*
* 每個 transform 接 unknown,回 unknown。
* 失敗策略:一律 throw,由 expander 包成 recipe 錯誤
*/
export type TransformFn = (value: unknown, arg?: string) => unknown;
const transforms: Record<string, TransformFn> = {
json_array: (v) => JSON.stringify(v ?? []),
to_string: (v) => {
if (v === null || v === undefined) return '';
if (typeof v === 'object') return JSON.stringify(v);
return String(v);
},
join: (v, sep) => {
if (!Array.isArray(v)) throw new Error('join: input 不是 array');
return v.map((x) => (typeof x === 'string' ? x : JSON.stringify(x))).join(sep ?? '\n');
},
markdown_list: (v) => {
if (!Array.isArray(v)) throw new Error('markdown_list: input 不是 array');
return v.map((x) => `- ${typeof x === 'string' ? x : JSON.stringify(x)}`).join('\n');
},
extract_field: (v, field) => {
if (!field) throw new Error('extract_field: 需要 field 參數,例如 extract_field:page_name');
if (!Array.isArray(v)) throw new Error('extract_field: input 不是 array');
return v.map((x) => (x && typeof x === 'object' ? (x as Record<string, unknown>)[field] : undefined));
},
first: (v) => {
if (!Array.isArray(v)) return v;
return v[0];
},
pluck_content: (v) => {
if (!Array.isArray(v)) throw new Error('pluck_content: input 不是 array');
return v
.map((b) => (b && typeof b === 'object' ? String((b as Record<string, unknown>).content ?? '') : ''))
.filter((s) => s.length > 0)
.join('\n\n---\n\n');
},
};
/** 解析 "name" 或 "name:arg" → 執行 transform */
export function applyTransform(value: unknown, spec: string): unknown {
const colonIdx = spec.indexOf(':');
const name = colonIdx === -1 ? spec : spec.slice(0, colonIdx);
const arg = colonIdx === -1 ? undefined : spec.slice(colonIdx + 1);
const fn = transforms[name];
if (!fn) throw new Error(`未知 transform: ${name}`);
return fn(value, arg);
}
+85
View File
@@ -0,0 +1,85 @@
/**
* POST /workflows/resume
* Webhook callback 進來時,從 paused state 撿起來繼續跑下游節點
* SDD: matrix/arcrun/.agents/specs/resumable-workflow/design.md Phase 3
*
* 安全:因為這是 daemon 主動 callback,沒有 partner keydaemon 不知道用戶 key
* 靠 task_id 為 nonce + 24h TTL + idempotent consume 保護
*/
import { Hono } from 'hono';
import type { Bindings } from '../types';
import { WorkflowPaused } from '../types';
import { GraphExecutor } from '../graph-executor';
import { createComponentLoader } from '../lib/component-loader';
import { consumePausedRun } from '../lib/paused-runs';
export const resumeRouter = new Hono<{ Bindings: Bindings }>();
resumeRouter.post('/workflows/resume', async (c) => {
let body: Record<string, unknown>;
try {
body = await c.req.json();
} catch {
return c.json({ error: 'request body 必須為 JSON' }, 400);
}
const taskId = typeof body.task_id === 'string' ? body.task_id : undefined;
if (!taskId) {
return c.json({ error: 'task_id 必填' }, 400);
}
// consume = load + deleteidempotent:重複 callback 第二次找不到 state,回 200
const state = await consumePausedRun(c.env.EXEC_CONTEXT, taskId);
if (!state) {
return c.json({
success: true,
noop: true,
reason: `paused state 不存在或已過期 (task_id=${taskId})`,
});
}
const callbackResult = {
success: body.success ?? true,
data: body.data,
error: body.error,
};
const loader = createComponentLoader(c.env);
const executor = new GraphExecutor(loader, undefined, c.env, state.api_key);
const start = Date.now();
try {
const result = await executor.resumeFromPaused({
graph: state.graph,
paused_node_id: state.paused_node_id,
paused_context: state.paused_context,
callback_result: callbackResult,
prior_trace: state.trace_so_far,
kvNamespace: c.env.EXEC_CONTEXT,
});
const duration_ms = Date.now() - start;
return c.json({
success: true,
resumed: true,
task_id: taskId,
run_id: state.run_id,
data: result.data,
trace: result.trace,
duration_ms,
});
} catch (err) {
if (err instanceof WorkflowPaused) {
// resume 後又遇到 pendingv2 nested 情境)— v1 仍持久化但回 paused-again
return c.json({
success: true,
paused_again: true,
task_id: err.task_id,
run_id: err.run_id,
paused_node_id: err.paused_node_id,
});
}
const errMsg = err instanceof Error ? err.message : String(err);
return c.json({ success: false, error: errMsg, task_id: taskId, run_id: state.run_id }, 500);
}
});
+24
View File
@@ -50,6 +50,9 @@ export type Bindings = {
GITHUB_CLIENT_ID?: string;
GITHUB_CLIENT_SECRET?: string;
SESSION_SIGNING_SECRET?: string; // 用於 HMAC session ID(可選,也可直接用 UUID)
// KBDB 整合
KBDB_INTERNAL_TOKEN?: string;
KBDB_BASE_URL?: string; // 預設 https://kbdb.inkstone.app
};
// 圖結構定義
@@ -119,6 +122,27 @@ export async function kvGetNodeOutput(store: KVContextStore, nodeId: string): Pr
}
}
/**
* Workflow 暫停(resumable workflow):
* 節點回 pending → graph-executor 持久化 state + throw 此類,被頂層接住回 paused 狀態給 caller
* SDD: matrix/arcrun/.agents/specs/resumable-workflow/design.md
*/
export class WorkflowPaused extends Error {
readonly task_id: string;
readonly run_id: string;
readonly paused_node_id: string;
readonly trace_so_far: TraceStep[];
constructor(task_id: string, run_id: string, paused_node_id: string, trace_so_far: TraceStep[]) {
super(`workflow paused at node ${paused_node_id} waiting for task ${task_id}`);
this.name = 'WorkflowPaused';
this.task_id = task_id;
this.run_id = run_id;
this.paused_node_id = paused_node_id;
this.trace_so_far = trace_so_far;
}
}
/** 執行失敗時拋出的自訂 Error,攜帶完整 trace 與失敗節點資訊 */
export class ExecutionError extends Error {
readonly failed_node: string;