From 4e7880c1cb5fd0256eeb804c4192f35a24530a56 Mon Sep 17 00:00:00 2001 From: richblack Date: Sat, 16 May 2026 17:15:16 +0800 Subject: [PATCH] =?UTF-8?q?fix(cypher-executor):=20/executions/paused=20?= =?UTF-8?q?=E8=B5=B0=20per-user=20index=20(=E5=BC=B7=20consistent)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 對應 LI SDD M2.1 修補。 問題:原 /executions/paused 走 EXEC_CONTEXT.list({prefix:'paused_run:'})。 CF KV list 是強 eventual consistent (30-60s 延遲),剛 paused 的 workflow list 不會立刻看到。本機 wrangler kv list 也回 [],但 KV.get 同 task_id 能即時拿到 — 證實 list vs get 一致性層級不同。 修補:persistPausedRun 額外維護 per-user index `paused_idx:{api_key}` (JSON Array),是單一 KV.get 拿全列表,**強 consistent 無延遲**。 consumePausedRun 同步從 index 移除。 新 helper: - listPausedRunsByApiKey(kv, apiKey, limit) — 走 index - PausedIndexEntry type — task_id / run_id / paused_node_id / workflow_name / expires_at / persisted_at 實測:trigger 後立刻 list 即時看到 paused (commit 前是 0) 副作用:index 寫入 + delete 都是 fire-and-forget 在 consume path,失敗 不擋主流程。Index entries 上限 100 防無限長。每次 read 過濾 expired。 Co-Authored-By: Claude Opus 4.7 --- cypher-executor/src/lib/paused-runs.ts | 74 ++++++++++++++++++++++++ cypher-executor/src/routes/executions.ts | 44 ++------------ 2 files changed, 78 insertions(+), 40 deletions(-) diff --git a/cypher-executor/src/lib/paused-runs.ts b/cypher-executor/src/lib/paused-runs.ts index 4871b93..076ff2e 100644 --- a/cypher-executor/src/lib/paused-runs.ts +++ b/cypher-executor/src/lib/paused-runs.ts @@ -29,20 +29,75 @@ export interface PausedRunState { } const KEY_PREFIX = 'paused_run:'; +/** + * Per-user paused index:列出某 api_key 當前 paused tasks 不依賴 CF KV list(強 eventual + * consistent,30-60s 延遲)。改維護一個 user-keyed JSON list,list 操作改 single KV.get。 + * + * Key: `paused_idx:{api_key}` + * Value: JSON Array<{task_id, paused_node_id, run_id, workflow_name?, expires_at, persisted_at}> + * + * 對應 LI SDD M2.1 — /executions/paused endpoint 即時性。 + */ +const IDX_PREFIX = 'paused_idx:'; const TTL_SECONDS = 24 * 60 * 60; +export type PausedIndexEntry = { + task_id: string; + run_id: string; + paused_node_id: string; + workflow_name?: string; + expires_at: number; + persisted_at: number; +}; + type KvBinding = { get: (key: string) => Promise; put: (key: string, value: string, options?: { expirationTtl?: number }) => Promise; delete: (key: string) => Promise; }; +async function readIndex(kv: KvBinding, apiKey: string): Promise { + const raw = await kv.get(`${IDX_PREFIX}${apiKey}`); + if (!raw) return []; + try { + const arr = JSON.parse(raw); + return Array.isArray(arr) ? arr : []; + } catch { + return []; + } +} + +async function writeIndex(kv: KvBinding, apiKey: string, entries: PausedIndexEntry[]): Promise { + // 過濾過期項目(避免 index 爆量) + const now = Date.now(); + const fresh = entries.filter((e) => e.expires_at > now); + await kv.put(`${IDX_PREFIX}${apiKey}`, JSON.stringify(fresh), { expirationTtl: TTL_SECONDS }); +} + export async function persistPausedRun( kv: KvBinding, taskId: string, state: PausedRunState, ): Promise { + // 1) 寫單一 task state await kv.put(`${KEY_PREFIX}${taskId}`, JSON.stringify(state), { expirationTtl: TTL_SECONDS }); + + // 2) 維護 per-user index(讓 /executions/paused list 不靠 KV list 即時拿到) + if (state.api_key) { + const idx = await readIndex(kv, state.api_key); + // 去重(重複 paused 同 task_id 時覆蓋) + const filtered = idx.filter((e) => e.task_id !== taskId); + filtered.unshift({ + task_id: taskId, + run_id: state.run_id, + paused_node_id: state.paused_node_id, + workflow_name: state.graph.name, + expires_at: state.expires_at, + persisted_at: Date.now(), + }); + // 限 100 筆避免 index 無限長(超過捨棄最舊) + await writeIndex(kv, state.api_key, filtered.slice(0, 100)); + } } export async function loadPausedRun( @@ -58,6 +113,19 @@ export async function loadPausedRun( } } +/** + * 列某 api_key 當前 paused tasks。走 per-user index(強 consistent,無 KV list 延遲) + */ +export async function listPausedRunsByApiKey( + kv: KvBinding, + apiKey: string, + limit = 20, +): Promise { + const idx = await readIndex(kv, apiKey); + const now = Date.now(); + return idx.filter((e) => e.expires_at > now).slice(0, limit); +} + /** * 原子讀+刪:避免同 task_id 重複 callback 重複執行下游 * (CF KV 沒真原子操作,但 delete 失敗不影響 load 已成功) @@ -71,6 +139,12 @@ export async function consumePausedRun( await kv.delete(`${KEY_PREFIX}${taskId}`).catch(() => { // delete 失敗不擋,最多就重複執行一次(接受) }); + // 同步從 per-user index 移除 + if (state.api_key) { + const idx = await readIndex(kv, state.api_key); + const filtered = idx.filter((e) => e.task_id !== taskId); + await writeIndex(kv, state.api_key, filtered).catch(() => {}); + } return state; } diff --git a/cypher-executor/src/routes/executions.ts b/cypher-executor/src/routes/executions.ts index 7e7533d..e357585 100644 --- a/cypher-executor/src/routes/executions.ts +++ b/cypher-executor/src/routes/executions.ts @@ -12,18 +12,15 @@ import { Hono } from 'hono'; import type { Bindings } from '../types'; +import { listPausedRunsByApiKey } from '../lib/paused-runs'; export const executionsRouter = new Hono<{ Bindings: Bindings }>(); /** * GET /executions/paused — 列當前 api_key 下所有 paused workflow * - * 走 EXEC_CONTEXT KV `paused_run:*` prefix scan,過濾 state.api_key === 當前用戶。 - * - * 回傳:[{ task_id, run_id, paused_node_id, workflow_name?, expires_at }] - * - * 注意:KV list 預設只回 keys,要 GET 每個值才知道 api_key — 上限 limit 設低 - * 避免 N+1 query 過爆。實際生產要做 per-user index 取代 prefix scan。 + * 走 per-user index `paused_idx:{api_key}`(單 KV get,強 consistent,無 KV list 延遲) + * 取代舊的 `paused_run:*` prefix scan(CF KV list 30-60 秒 eventual consistent) */ executionsRouter.get('/executions/paused', async (c) => { const apiKey = c.req.header('X-Arcrun-API-Key'); @@ -39,40 +36,7 @@ executionsRouter.get('/executions/paused', async (c) => { const limitParam = c.req.query('limit'); const limit = Math.min(Math.max(parseInt(limitParam || '20', 10), 1), 100); - const list = await c.env.EXEC_CONTEXT.list({ prefix: 'paused_run:', limit: 200 }); - const paused: Array<{ - task_id: string; - run_id: string; - paused_node_id: string; - workflow_name?: string; - expires_at?: number; - }> = []; - - for (const key of list.keys) { - if (paused.length >= limit) break; - const raw = await c.env.EXEC_CONTEXT.get(key.name); - if (!raw) continue; - try { - const state = JSON.parse(raw) as { - run_id: string; - paused_node_id: string; - api_key?: string; - expires_at?: number; - graph?: { name?: string }; - }; - if (state.api_key !== apiKey) continue; // 隔離租戶 - const task_id = key.name.replace(/^paused_run:/, ''); - paused.push({ - task_id, - run_id: state.run_id, - paused_node_id: state.paused_node_id, - workflow_name: state.graph?.name, - expires_at: state.expires_at, - }); - } catch { - // 損毀 state 跳過 - } - } + const paused = await listPausedRunsByApiKey(c.env.EXEC_CONTEXT, apiKey, limit); return c.json({ ok: true,