fix(cypher-executor): /executions/paused 走 per-user index (強 consistent)

對應 LI SDD M2.1 修補。

問題:原 /executions/paused 走 EXEC_CONTEXT.list({prefix:'paused_run:'})。
CF KV list 是強 eventual consistent (30-60s 延遲),剛 paused 的 workflow
list 不會立刻看到。本機 wrangler kv list 也回 [],但 KV.get 同 task_id
能即時拿到 — 證實 list vs get 一致性層級不同。

修補:persistPausedRun 額外維護 per-user index `paused_idx:{api_key}`
(JSON Array),是單一 KV.get 拿全列表,**強 consistent 無延遲**。
consumePausedRun 同步從 index 移除。

新 helper:
- listPausedRunsByApiKey(kv, apiKey, limit) — 走 index
- PausedIndexEntry type — task_id / run_id / paused_node_id / workflow_name /
  expires_at / persisted_at

實測:trigger 後立刻 list 即時看到 paused (commit 前是 0)

副作用:index 寫入 + delete 都是 fire-and-forget 在 consume path,失敗
不擋主流程。Index entries 上限 100 防無限長。每次 read 過濾 expired。

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-16 17:15:16 +08:00
parent 10834ef9bb
commit 4e7880c1cb
2 changed files with 78 additions and 40 deletions
+74
View File
@@ -29,20 +29,75 @@ export interface PausedRunState {
}
const KEY_PREFIX = 'paused_run:';
/**
* Per-user paused index:列出某 api_key 當前 paused tasks 不依賴 CF KV list(強 eventual
* consistent30-60s 延遲)。改維護一個 user-keyed JSON listlist 操作改 single KV.get。
*
* Key: `paused_idx:{api_key}`
* Value: JSON Array<{task_id, paused_node_id, run_id, workflow_name?, expires_at, persisted_at}>
*
* 對應 LI SDD M2.1 — /executions/paused endpoint 即時性。
*/
const IDX_PREFIX = 'paused_idx:';
const TTL_SECONDS = 24 * 60 * 60;
export type PausedIndexEntry = {
task_id: string;
run_id: string;
paused_node_id: string;
workflow_name?: string;
expires_at: number;
persisted_at: number;
};
type KvBinding = {
get: (key: string) => Promise<string | null>;
put: (key: string, value: string, options?: { expirationTtl?: number }) => Promise<void>;
delete: (key: string) => Promise<void>;
};
async function readIndex(kv: KvBinding, apiKey: string): Promise<PausedIndexEntry[]> {
const raw = await kv.get(`${IDX_PREFIX}${apiKey}`);
if (!raw) return [];
try {
const arr = JSON.parse(raw);
return Array.isArray(arr) ? arr : [];
} catch {
return [];
}
}
async function writeIndex(kv: KvBinding, apiKey: string, entries: PausedIndexEntry[]): Promise<void> {
// 過濾過期項目(避免 index 爆量)
const now = Date.now();
const fresh = entries.filter((e) => e.expires_at > now);
await kv.put(`${IDX_PREFIX}${apiKey}`, JSON.stringify(fresh), { expirationTtl: TTL_SECONDS });
}
export async function persistPausedRun(
kv: KvBinding,
taskId: string,
state: PausedRunState,
): Promise<void> {
// 1) 寫單一 task state
await kv.put(`${KEY_PREFIX}${taskId}`, JSON.stringify(state), { expirationTtl: TTL_SECONDS });
// 2) 維護 per-user index(讓 /executions/paused list 不靠 KV list 即時拿到)
if (state.api_key) {
const idx = await readIndex(kv, state.api_key);
// 去重(重複 paused 同 task_id 時覆蓋)
const filtered = idx.filter((e) => e.task_id !== taskId);
filtered.unshift({
task_id: taskId,
run_id: state.run_id,
paused_node_id: state.paused_node_id,
workflow_name: state.graph.name,
expires_at: state.expires_at,
persisted_at: Date.now(),
});
// 限 100 筆避免 index 無限長(超過捨棄最舊)
await writeIndex(kv, state.api_key, filtered.slice(0, 100));
}
}
export async function loadPausedRun(
@@ -58,6 +113,19 @@ export async function loadPausedRun(
}
}
/**
* 列某 api_key 當前 paused tasks。走 per-user index(強 consistent,無 KV list 延遲)
*/
export async function listPausedRunsByApiKey(
kv: KvBinding,
apiKey: string,
limit = 20,
): Promise<PausedIndexEntry[]> {
const idx = await readIndex(kv, apiKey);
const now = Date.now();
return idx.filter((e) => e.expires_at > now).slice(0, limit);
}
/**
* 原子讀+刪:避免同 task_id 重複 callback 重複執行下游
* (CF KV 沒真原子操作,但 delete 失敗不影響 load 已成功)
@@ -71,6 +139,12 @@ export async function consumePausedRun(
await kv.delete(`${KEY_PREFIX}${taskId}`).catch(() => {
// delete 失敗不擋,最多就重複執行一次(接受)
});
// 同步從 per-user index 移除
if (state.api_key) {
const idx = await readIndex(kv, state.api_key);
const filtered = idx.filter((e) => e.task_id !== taskId);
await writeIndex(kv, state.api_key, filtered).catch(() => {});
}
return state;
}
+4 -40
View File
@@ -12,18 +12,15 @@
import { Hono } from 'hono';
import type { Bindings } from '../types';
import { listPausedRunsByApiKey } from '../lib/paused-runs';
export const executionsRouter = new Hono<{ Bindings: Bindings }>();
/**
* GET /executions/paused — 列當前 api_key 下所有 paused workflow
*
* 走 EXEC_CONTEXT KV `paused_run:*` prefix scan,過濾 state.api_key === 當前用戶。
*
* 回傳:[{ task_id, run_id, paused_node_id, workflow_name?, expires_at }]
*
* 注意:KV list 預設只回 keys,要 GET 每個值才知道 api_key — 上限 limit 設低
* 避免 N+1 query 過爆。實際生產要做 per-user index 取代 prefix scan。
* 走 per-user index `paused_idx:{api_key}`(單 KV get,強 consistent,無 KV list 延遲)
* 取代舊的 `paused_run:*` prefix scanCF KV list 30-60 秒 eventual consistent
*/
executionsRouter.get('/executions/paused', async (c) => {
const apiKey = c.req.header('X-Arcrun-API-Key');
@@ -39,40 +36,7 @@ executionsRouter.get('/executions/paused', async (c) => {
const limitParam = c.req.query('limit');
const limit = Math.min(Math.max(parseInt(limitParam || '20', 10), 1), 100);
const list = await c.env.EXEC_CONTEXT.list({ prefix: 'paused_run:', limit: 200 });
const paused: Array<{
task_id: string;
run_id: string;
paused_node_id: string;
workflow_name?: string;
expires_at?: number;
}> = [];
for (const key of list.keys) {
if (paused.length >= limit) break;
const raw = await c.env.EXEC_CONTEXT.get(key.name);
if (!raw) continue;
try {
const state = JSON.parse(raw) as {
run_id: string;
paused_node_id: string;
api_key?: string;
expires_at?: number;
graph?: { name?: string };
};
if (state.api_key !== apiKey) continue; // 隔離租戶
const task_id = key.name.replace(/^paused_run:/, '');
paused.push({
task_id,
run_id: state.run_id,
paused_node_id: state.paused_node_id,
workflow_name: state.graph?.name,
expires_at: state.expires_at,
});
} catch {
// 損毀 state 跳過
}
}
const paused = await listPausedRunsByApiKey(c.env.EXEC_CONTEXT, apiKey, limit);
return c.json({
ok: true,