From 989fbeb9ac996f40614cbc28392d63c1a8edff62 Mon Sep 17 00:00:00 2001 From: richblack Date: Sat, 16 May 2026 15:58:35 +0800 Subject: [PATCH] feat(cypher-executor): /executions/* introspection (LI SDD M2.1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 對應 .agents/specs/llm-interface/ Milestone 2.1。給 AI debug 用的執行 狀態檢視端點。3 個新路由: GET /executions/paused - 列當前 api_key 下所有 paused workflow(等 callback resume) - 走 EXEC_CONTEXT KV `paused_run:*` prefix scan,過濾 state.api_key - 回 [{task_id, run_id, paused_node_id, workflow_name, expires_at}] - 限 limit 100,避免 KV N+1 爆 GET /executions/:task_id - 看單一 paused state 細節(trace_so_far / paused_context / pending_result) - 隔離租戶(state.api_key !== 用戶 ak → 回 not_found 不洩漏存在性) - 回完整 state JSON GET /workflows/:name/executions - 列某 workflow 最近 N 次執行 verdict(走 ANALYTICS_KV stats:{name}:* prefix) - 先驗 workflow 屬該 api_key - 按 timestamp 後綴 desc sort 統一 error contract(LI SDD §1.3 / §1.4 規範): - ok: bool - error_code: enum (auth_missing / not_found / internal_error) - human_message: 描述 - next_actions: 陣列,給 AI 看的下一步 - hints: 成功時的 next-step 建議 不影響既有路由 contract。 Co-Authored-By: Claude Opus 4.7 --- cypher-executor/src/index.ts | 2 + cypher-executor/src/routes/executions.ts | 239 +++++++++++++++++++++++ 2 files changed, 241 insertions(+) create mode 100644 cypher-executor/src/routes/executions.ts diff --git a/cypher-executor/src/index.ts b/cypher-executor/src/index.ts index 6015d21..341b9cc 100644 --- a/cypher-executor/src/index.ts +++ b/cypher-executor/src/index.ts @@ -18,6 +18,7 @@ import { credentialsRouter } from './routes/credentials'; import { webhooksNamedRouter } from './routes/webhooks-named'; import { authRouter } from './routes/auth'; import { resumeRouter } from './routes/resume'; +import { executionsRouter } from './routes/executions'; const app = new Hono<{ Bindings: Bindings }>(); @@ -44,6 +45,7 @@ app.route('/', recipesRouter); app.route('/', credentialsRouter); app.route('/', authRouter); app.route('/', resumeRouter); +app.route('/', executionsRouter); // LI SDD M2.1: /executions/* + /workflows/:name/executions // Worker 導出(fetch + scheduled) // scheduled handler 對應 wrangler.toml [triggers].crons,每分鐘 tick; diff --git a/cypher-executor/src/routes/executions.ts b/cypher-executor/src/routes/executions.ts new file mode 100644 index 0000000..7e7533d --- /dev/null +++ b/cypher-executor/src/routes/executions.ts @@ -0,0 +1,239 @@ +/** + * Executions routes — LI SDD M2.1 + * + * 對應 .agents/specs/llm-interface/ Milestone 2.1。給 AI 看 workflow 執行狀態的端點。 + * + * - GET /executions/paused — 列當前所有 paused 的 workflow(等 callback resume) + * - GET /executions/:task_id — 看單一 paused state 細節(含 trace、graph、node id) + * - GET /workflows/:name/executions — 列某 workflow 最近 N 次執行 verdict + * + * 設計:純讀,無 side effect。所有路由要 api_key auth(防偷看他人 workflow state)。 + */ + +import { Hono } from 'hono'; +import type { Bindings } from '../types'; + +export const executionsRouter = new Hono<{ Bindings: Bindings }>(); + +/** + * GET /executions/paused — 列當前 api_key 下所有 paused workflow + * + * 走 EXEC_CONTEXT KV `paused_run:*` prefix scan,過濾 state.api_key === 當前用戶。 + * + * 回傳:[{ task_id, run_id, paused_node_id, workflow_name?, expires_at }] + * + * 注意:KV list 預設只回 keys,要 GET 每個值才知道 api_key — 上限 limit 設低 + * 避免 N+1 query 過爆。實際生產要做 per-user index 取代 prefix scan。 + */ +executionsRouter.get('/executions/paused', async (c) => { + const apiKey = c.req.header('X-Arcrun-API-Key'); + if (!apiKey) { + return c.json({ + ok: false, + error_code: 'auth_missing', + human_message: '缺 X-Arcrun-API-Key header', + next_actions: ['call /me 取得你的 ak_xxx,加進 header'], + }, 401); + } + + const limitParam = c.req.query('limit'); + const limit = Math.min(Math.max(parseInt(limitParam || '20', 10), 1), 100); + + const list = await c.env.EXEC_CONTEXT.list({ prefix: 'paused_run:', limit: 200 }); + const paused: Array<{ + task_id: string; + run_id: string; + paused_node_id: string; + workflow_name?: string; + expires_at?: number; + }> = []; + + for (const key of list.keys) { + if (paused.length >= limit) break; + const raw = await c.env.EXEC_CONTEXT.get(key.name); + if (!raw) continue; + try { + const state = JSON.parse(raw) as { + run_id: string; + paused_node_id: string; + api_key?: string; + expires_at?: number; + graph?: { name?: string }; + }; + if (state.api_key !== apiKey) continue; // 隔離租戶 + const task_id = key.name.replace(/^paused_run:/, ''); + paused.push({ + task_id, + run_id: state.run_id, + paused_node_id: state.paused_node_id, + workflow_name: state.graph?.name, + expires_at: state.expires_at, + }); + } catch { + // 損毀 state 跳過 + } + } + + return c.json({ + ok: true, + data: { count: paused.length, paused }, + hints: paused.length > 0 + ? [`${paused.length} 個 workflow 等 callback resume。call get_execution_trace(task_id) 看細節`] + : ['沒有任何 paused workflow'], + }); +}); + +/** + * GET /executions/:task_id — 看單一 paused workflow 的 state(trace、graph、context) + * + * task_id 來源:trigger workflow 時 response 含 paused 結果,task_id 在 error 字串裡, + * 或前端 list_paused_executions 回的 task_id。 + * + * 隔離:只能讀自己 api_key 的 state。 + */ +executionsRouter.get('/executions/:task_id', async (c) => { + const apiKey = c.req.header('X-Arcrun-API-Key'); + if (!apiKey) { + return c.json({ + ok: false, + error_code: 'auth_missing', + human_message: '缺 X-Arcrun-API-Key header', + next_actions: ['加 X-Arcrun-API-Key header'], + }, 401); + } + + const taskId = c.req.param('task_id'); + const raw = await c.env.EXEC_CONTEXT.get(`paused_run:${taskId}`); + + if (!raw) { + return c.json({ + ok: false, + error_code: 'not_found', + human_message: `task_id "${taskId}" 沒對應的 paused state(可能已 resume 完、過 24h TTL 被 GC、或從未存在)`, + next_actions: [ + 'call /executions/paused 看當前所有 paused,確認 task_id 正確', + '若該 workflow 不是 paused 型,看 /workflows/:name/executions 查歷史 verdict', + ], + }, 404); + } + + let state: { + run_id: string; + graph?: unknown; + paused_node_id: string; + paused_context?: Record; + paused_pending_result?: Record; + trace_so_far?: unknown; + api_key?: string; + expires_at?: number; + }; + try { + state = JSON.parse(raw); + } catch { + return c.json({ + ok: false, + error_code: 'internal_error', + human_message: 'paused state JSON 損毀', + next_actions: ['告訴 leo / 平台維護者'], + }, 500); + } + + if (state.api_key !== apiKey) { + return c.json({ + ok: false, + error_code: 'not_found', // 不洩漏存在性 + human_message: `task_id "${taskId}" 找不到`, + next_actions: ['確認 task_id 屬於你 (用 /executions/paused 列出)'], + }, 404); + } + + return c.json({ + ok: true, + data: { + task_id: taskId, + run_id: state.run_id, + paused_node_id: state.paused_node_id, + paused_context: state.paused_context, + paused_pending_result: state.paused_pending_result, + trace_so_far: state.trace_so_far, + expires_at: state.expires_at, + }, + hints: [ + 'paused 狀態 = workflow 等 daemon callback。等對應 service 回 POST /workflows/resume 即可繼續', + '若 daemon 掛了,看 expires_at — 過 24h KV TTL 會 GC 此 state', + ], + }); +}); + +/** + * GET /workflows/:name/executions — 看某 workflow 最近 N 次執行 verdict + * + * 走 ANALYTICS_KV `stats:{workflowId}:*` prefix scan。 + * + * workflowId 等於 webhook name(execution-logger 寫入時用 graph.id ?? name)。 + * + * 限制:ANALYTICS_KV list 沒辦法依 timestamp 排序,只能拿 key 後段 timestamp 排。 + */ +executionsRouter.get('/workflows/:name/executions', async (c) => { + const apiKey = c.req.header('X-Arcrun-API-Key'); + if (!apiKey) { + return c.json({ + ok: false, + error_code: 'auth_missing', + human_message: '缺 X-Arcrun-API-Key header', + next_actions: ['加 X-Arcrun-API-Key header'], + }, 401); + } + + const name = c.req.param('name'); + const limitParam = c.req.query('limit'); + const limit = Math.min(Math.max(parseInt(limitParam || '10', 10), 1), 100); + + // 確認 workflow 是該 api_key 的(防偷看他人) + const wfRaw = await c.env.WEBHOOKS.get(`${apiKey}:wf:${name}`, 'text'); + if (!wfRaw) { + return c.json({ + ok: false, + error_code: 'not_found', + human_message: `workflow "${name}" 不存在或不屬於你`, + next_actions: ['call /webhooks/named 看你有什麼 workflow'], + }, 404); + } + + // 撈 stats:{name}:* 全 list(每個 key 含 timestamp 後綴) + const list = await c.env.ANALYTICS_KV.list({ prefix: `stats:${name}:`, limit: 1000 }); + + // 按 timestamp 降序(key suffix 是 unix ms) + const sorted = [...list.keys].sort((a, b) => { + const ta = parseInt(a.name.split(':').pop() ?? '0', 10); + const tb = parseInt(b.name.split(':').pop() ?? '0', 10); + return tb - ta; + }).slice(0, limit); + + const executions = []; + for (const key of sorted) { + const raw = await c.env.ANALYTICS_KV.get(key.name); + if (!raw) continue; + try { + const record = JSON.parse(raw); + executions.push({ + timestamp: key.name.split(':').pop(), + ...record, + }); + } catch { + // skip + } + } + + return c.json({ + ok: true, + data: { + workflow_name: name, + count: executions.length, + executions, + }, + hints: executions.length === 0 + ? ['尚未有任何執行紀錄(或都過了 90d TTL)。先 call /webhooks/named/:name/trigger 跑一次'] + : [`最近 ${executions.length} 次。看到 verdict=failed 的,call /executions/:task_id 看 paused state 或繼續 debug`], + }); +});