feat(cypher-executor): /executions/* introspection (LI SDD M2.1)
對應 .agents/specs/llm-interface/ Milestone 2.1。給 AI debug 用的執行
狀態檢視端點。3 個新路由:
GET /executions/paused
- 列當前 api_key 下所有 paused workflow(等 callback resume)
- 走 EXEC_CONTEXT KV `paused_run:*` prefix scan,過濾 state.api_key
- 回 [{task_id, run_id, paused_node_id, workflow_name, expires_at}]
- 限 limit 100,避免 KV N+1 爆
GET /executions/:task_id
- 看單一 paused state 細節(trace_so_far / paused_context / pending_result)
- 隔離租戶(state.api_key !== 用戶 ak → 回 not_found 不洩漏存在性)
- 回完整 state JSON
GET /workflows/:name/executions
- 列某 workflow 最近 N 次執行 verdict(走 ANALYTICS_KV stats:{name}:* prefix)
- 先驗 workflow 屬該 api_key
- 按 timestamp 後綴 desc sort
統一 error contract(LI SDD §1.3 / §1.4 規範):
- ok: bool
- error_code: enum (auth_missing / not_found / internal_error)
- human_message: 描述
- next_actions: 陣列,給 AI 看的下一步
- hints: 成功時的 next-step 建議
不影響既有路由 contract。
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -18,6 +18,7 @@ import { credentialsRouter } from './routes/credentials';
|
||||
import { webhooksNamedRouter } from './routes/webhooks-named';
|
||||
import { authRouter } from './routes/auth';
|
||||
import { resumeRouter } from './routes/resume';
|
||||
import { executionsRouter } from './routes/executions';
|
||||
|
||||
const app = new Hono<{ Bindings: Bindings }>();
|
||||
|
||||
@@ -44,6 +45,7 @@ app.route('/', recipesRouter);
|
||||
app.route('/', credentialsRouter);
|
||||
app.route('/', authRouter);
|
||||
app.route('/', resumeRouter);
|
||||
app.route('/', executionsRouter); // LI SDD M2.1: /executions/* + /workflows/:name/executions
|
||||
|
||||
// Worker 導出(fetch + scheduled)
|
||||
// scheduled handler 對應 wrangler.toml [triggers].crons,每分鐘 tick;
|
||||
|
||||
@@ -0,0 +1,239 @@
|
||||
/**
|
||||
* Executions routes — LI SDD M2.1
|
||||
*
|
||||
* 對應 .agents/specs/llm-interface/ Milestone 2.1。給 AI 看 workflow 執行狀態的端點。
|
||||
*
|
||||
* - GET /executions/paused — 列當前所有 paused 的 workflow(等 callback resume)
|
||||
* - GET /executions/:task_id — 看單一 paused state 細節(含 trace、graph、node id)
|
||||
* - GET /workflows/:name/executions — 列某 workflow 最近 N 次執行 verdict
|
||||
*
|
||||
* 設計:純讀,無 side effect。所有路由要 api_key auth(防偷看他人 workflow state)。
|
||||
*/
|
||||
|
||||
import { Hono } from 'hono';
|
||||
import type { Bindings } from '../types';
|
||||
|
||||
export const executionsRouter = new Hono<{ Bindings: Bindings }>();
|
||||
|
||||
/**
|
||||
* GET /executions/paused — 列當前 api_key 下所有 paused workflow
|
||||
*
|
||||
* 走 EXEC_CONTEXT KV `paused_run:*` prefix scan,過濾 state.api_key === 當前用戶。
|
||||
*
|
||||
* 回傳:[{ task_id, run_id, paused_node_id, workflow_name?, expires_at }]
|
||||
*
|
||||
* 注意:KV list 預設只回 keys,要 GET 每個值才知道 api_key — 上限 limit 設低
|
||||
* 避免 N+1 query 過爆。實際生產要做 per-user index 取代 prefix scan。
|
||||
*/
|
||||
executionsRouter.get('/executions/paused', async (c) => {
|
||||
const apiKey = c.req.header('X-Arcrun-API-Key');
|
||||
if (!apiKey) {
|
||||
return c.json({
|
||||
ok: false,
|
||||
error_code: 'auth_missing',
|
||||
human_message: '缺 X-Arcrun-API-Key header',
|
||||
next_actions: ['call /me 取得你的 ak_xxx,加進 header'],
|
||||
}, 401);
|
||||
}
|
||||
|
||||
const limitParam = c.req.query('limit');
|
||||
const limit = Math.min(Math.max(parseInt(limitParam || '20', 10), 1), 100);
|
||||
|
||||
const list = await c.env.EXEC_CONTEXT.list({ prefix: 'paused_run:', limit: 200 });
|
||||
const paused: Array<{
|
||||
task_id: string;
|
||||
run_id: string;
|
||||
paused_node_id: string;
|
||||
workflow_name?: string;
|
||||
expires_at?: number;
|
||||
}> = [];
|
||||
|
||||
for (const key of list.keys) {
|
||||
if (paused.length >= limit) break;
|
||||
const raw = await c.env.EXEC_CONTEXT.get(key.name);
|
||||
if (!raw) continue;
|
||||
try {
|
||||
const state = JSON.parse(raw) as {
|
||||
run_id: string;
|
||||
paused_node_id: string;
|
||||
api_key?: string;
|
||||
expires_at?: number;
|
||||
graph?: { name?: string };
|
||||
};
|
||||
if (state.api_key !== apiKey) continue; // 隔離租戶
|
||||
const task_id = key.name.replace(/^paused_run:/, '');
|
||||
paused.push({
|
||||
task_id,
|
||||
run_id: state.run_id,
|
||||
paused_node_id: state.paused_node_id,
|
||||
workflow_name: state.graph?.name,
|
||||
expires_at: state.expires_at,
|
||||
});
|
||||
} catch {
|
||||
// 損毀 state 跳過
|
||||
}
|
||||
}
|
||||
|
||||
return c.json({
|
||||
ok: true,
|
||||
data: { count: paused.length, paused },
|
||||
hints: paused.length > 0
|
||||
? [`${paused.length} 個 workflow 等 callback resume。call get_execution_trace(task_id) 看細節`]
|
||||
: ['沒有任何 paused workflow'],
|
||||
});
|
||||
});
|
||||
|
||||
/**
|
||||
* GET /executions/:task_id — 看單一 paused workflow 的 state(trace、graph、context)
|
||||
*
|
||||
* task_id 來源:trigger workflow 時 response 含 paused 結果,task_id 在 error 字串裡,
|
||||
* 或前端 list_paused_executions 回的 task_id。
|
||||
*
|
||||
* 隔離:只能讀自己 api_key 的 state。
|
||||
*/
|
||||
executionsRouter.get('/executions/:task_id', async (c) => {
|
||||
const apiKey = c.req.header('X-Arcrun-API-Key');
|
||||
if (!apiKey) {
|
||||
return c.json({
|
||||
ok: false,
|
||||
error_code: 'auth_missing',
|
||||
human_message: '缺 X-Arcrun-API-Key header',
|
||||
next_actions: ['加 X-Arcrun-API-Key header'],
|
||||
}, 401);
|
||||
}
|
||||
|
||||
const taskId = c.req.param('task_id');
|
||||
const raw = await c.env.EXEC_CONTEXT.get(`paused_run:${taskId}`);
|
||||
|
||||
if (!raw) {
|
||||
return c.json({
|
||||
ok: false,
|
||||
error_code: 'not_found',
|
||||
human_message: `task_id "${taskId}" 沒對應的 paused state(可能已 resume 完、過 24h TTL 被 GC、或從未存在)`,
|
||||
next_actions: [
|
||||
'call /executions/paused 看當前所有 paused,確認 task_id 正確',
|
||||
'若該 workflow 不是 paused 型,看 /workflows/:name/executions 查歷史 verdict',
|
||||
],
|
||||
}, 404);
|
||||
}
|
||||
|
||||
let state: {
|
||||
run_id: string;
|
||||
graph?: unknown;
|
||||
paused_node_id: string;
|
||||
paused_context?: Record<string, unknown>;
|
||||
paused_pending_result?: Record<string, unknown>;
|
||||
trace_so_far?: unknown;
|
||||
api_key?: string;
|
||||
expires_at?: number;
|
||||
};
|
||||
try {
|
||||
state = JSON.parse(raw);
|
||||
} catch {
|
||||
return c.json({
|
||||
ok: false,
|
||||
error_code: 'internal_error',
|
||||
human_message: 'paused state JSON 損毀',
|
||||
next_actions: ['告訴 leo / 平台維護者'],
|
||||
}, 500);
|
||||
}
|
||||
|
||||
if (state.api_key !== apiKey) {
|
||||
return c.json({
|
||||
ok: false,
|
||||
error_code: 'not_found', // 不洩漏存在性
|
||||
human_message: `task_id "${taskId}" 找不到`,
|
||||
next_actions: ['確認 task_id 屬於你 (用 /executions/paused 列出)'],
|
||||
}, 404);
|
||||
}
|
||||
|
||||
return c.json({
|
||||
ok: true,
|
||||
data: {
|
||||
task_id: taskId,
|
||||
run_id: state.run_id,
|
||||
paused_node_id: state.paused_node_id,
|
||||
paused_context: state.paused_context,
|
||||
paused_pending_result: state.paused_pending_result,
|
||||
trace_so_far: state.trace_so_far,
|
||||
expires_at: state.expires_at,
|
||||
},
|
||||
hints: [
|
||||
'paused 狀態 = workflow 等 daemon callback。等對應 service 回 POST /workflows/resume 即可繼續',
|
||||
'若 daemon 掛了,看 expires_at — 過 24h KV TTL 會 GC 此 state',
|
||||
],
|
||||
});
|
||||
});
|
||||
|
||||
/**
|
||||
* GET /workflows/:name/executions — 看某 workflow 最近 N 次執行 verdict
|
||||
*
|
||||
* 走 ANALYTICS_KV `stats:{workflowId}:*` prefix scan。
|
||||
*
|
||||
* workflowId 等於 webhook name(execution-logger 寫入時用 graph.id ?? name)。
|
||||
*
|
||||
* 限制:ANALYTICS_KV list 沒辦法依 timestamp 排序,只能拿 key 後段 timestamp 排。
|
||||
*/
|
||||
executionsRouter.get('/workflows/:name/executions', async (c) => {
|
||||
const apiKey = c.req.header('X-Arcrun-API-Key');
|
||||
if (!apiKey) {
|
||||
return c.json({
|
||||
ok: false,
|
||||
error_code: 'auth_missing',
|
||||
human_message: '缺 X-Arcrun-API-Key header',
|
||||
next_actions: ['加 X-Arcrun-API-Key header'],
|
||||
}, 401);
|
||||
}
|
||||
|
||||
const name = c.req.param('name');
|
||||
const limitParam = c.req.query('limit');
|
||||
const limit = Math.min(Math.max(parseInt(limitParam || '10', 10), 1), 100);
|
||||
|
||||
// 確認 workflow 是該 api_key 的(防偷看他人)
|
||||
const wfRaw = await c.env.WEBHOOKS.get(`${apiKey}:wf:${name}`, 'text');
|
||||
if (!wfRaw) {
|
||||
return c.json({
|
||||
ok: false,
|
||||
error_code: 'not_found',
|
||||
human_message: `workflow "${name}" 不存在或不屬於你`,
|
||||
next_actions: ['call /webhooks/named 看你有什麼 workflow'],
|
||||
}, 404);
|
||||
}
|
||||
|
||||
// 撈 stats:{name}:* 全 list(每個 key 含 timestamp 後綴)
|
||||
const list = await c.env.ANALYTICS_KV.list({ prefix: `stats:${name}:`, limit: 1000 });
|
||||
|
||||
// 按 timestamp 降序(key suffix 是 unix ms)
|
||||
const sorted = [...list.keys].sort((a, b) => {
|
||||
const ta = parseInt(a.name.split(':').pop() ?? '0', 10);
|
||||
const tb = parseInt(b.name.split(':').pop() ?? '0', 10);
|
||||
return tb - ta;
|
||||
}).slice(0, limit);
|
||||
|
||||
const executions = [];
|
||||
for (const key of sorted) {
|
||||
const raw = await c.env.ANALYTICS_KV.get(key.name);
|
||||
if (!raw) continue;
|
||||
try {
|
||||
const record = JSON.parse(raw);
|
||||
executions.push({
|
||||
timestamp: key.name.split(':').pop(),
|
||||
...record,
|
||||
});
|
||||
} catch {
|
||||
// skip
|
||||
}
|
||||
}
|
||||
|
||||
return c.json({
|
||||
ok: true,
|
||||
data: {
|
||||
workflow_name: name,
|
||||
count: executions.length,
|
||||
executions,
|
||||
},
|
||||
hints: executions.length === 0
|
||||
? ['尚未有任何執行紀錄(或都過了 90d TTL)。先 call /webhooks/named/:name/trigger 跑一次']
|
||||
: [`最近 ${executions.length} 次。看到 verdict=failed 的,call /executions/:task_id 看 paused state 或繼續 debug`],
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user