fix(cypher-executor): /executions/paused 走 per-user index (強 consistent)
對應 LI SDD M2.1 修補。
問題:原 /executions/paused 走 EXEC_CONTEXT.list({prefix:'paused_run:'})。
CF KV list 是強 eventual consistent (30-60s 延遲),剛 paused 的 workflow
list 不會立刻看到。本機 wrangler kv list 也回 [],但 KV.get 同 task_id
能即時拿到 — 證實 list vs get 一致性層級不同。
修補:persistPausedRun 額外維護 per-user index `paused_idx:{api_key}`
(JSON Array),是單一 KV.get 拿全列表,**強 consistent 無延遲**。
consumePausedRun 同步從 index 移除。
新 helper:
- listPausedRunsByApiKey(kv, apiKey, limit) — 走 index
- PausedIndexEntry type — task_id / run_id / paused_node_id / workflow_name /
expires_at / persisted_at
實測:trigger 後立刻 list 即時看到 paused (commit 前是 0)
副作用:index 寫入 + delete 都是 fire-and-forget 在 consume path,失敗
不擋主流程。Index entries 上限 100 防無限長。每次 read 過濾 expired。
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -29,20 +29,75 @@ export interface PausedRunState {
|
|||||||
}
|
}
|
||||||
|
|
||||||
const KEY_PREFIX = 'paused_run:';
|
const KEY_PREFIX = 'paused_run:';
|
||||||
|
/**
|
||||||
|
* Per-user paused index:列出某 api_key 當前 paused tasks 不依賴 CF KV list(強 eventual
|
||||||
|
* consistent,30-60s 延遲)。改維護一個 user-keyed JSON list,list 操作改 single KV.get。
|
||||||
|
*
|
||||||
|
* Key: `paused_idx:{api_key}`
|
||||||
|
* Value: JSON Array<{task_id, paused_node_id, run_id, workflow_name?, expires_at, persisted_at}>
|
||||||
|
*
|
||||||
|
* 對應 LI SDD M2.1 — /executions/paused endpoint 即時性。
|
||||||
|
*/
|
||||||
|
const IDX_PREFIX = 'paused_idx:';
|
||||||
const TTL_SECONDS = 24 * 60 * 60;
|
const TTL_SECONDS = 24 * 60 * 60;
|
||||||
|
|
||||||
|
export type PausedIndexEntry = {
|
||||||
|
task_id: string;
|
||||||
|
run_id: string;
|
||||||
|
paused_node_id: string;
|
||||||
|
workflow_name?: string;
|
||||||
|
expires_at: number;
|
||||||
|
persisted_at: number;
|
||||||
|
};
|
||||||
|
|
||||||
type KvBinding = {
|
type KvBinding = {
|
||||||
get: (key: string) => Promise<string | null>;
|
get: (key: string) => Promise<string | null>;
|
||||||
put: (key: string, value: string, options?: { expirationTtl?: number }) => Promise<void>;
|
put: (key: string, value: string, options?: { expirationTtl?: number }) => Promise<void>;
|
||||||
delete: (key: string) => Promise<void>;
|
delete: (key: string) => Promise<void>;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
async function readIndex(kv: KvBinding, apiKey: string): Promise<PausedIndexEntry[]> {
|
||||||
|
const raw = await kv.get(`${IDX_PREFIX}${apiKey}`);
|
||||||
|
if (!raw) return [];
|
||||||
|
try {
|
||||||
|
const arr = JSON.parse(raw);
|
||||||
|
return Array.isArray(arr) ? arr : [];
|
||||||
|
} catch {
|
||||||
|
return [];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function writeIndex(kv: KvBinding, apiKey: string, entries: PausedIndexEntry[]): Promise<void> {
|
||||||
|
// 過濾過期項目(避免 index 爆量)
|
||||||
|
const now = Date.now();
|
||||||
|
const fresh = entries.filter((e) => e.expires_at > now);
|
||||||
|
await kv.put(`${IDX_PREFIX}${apiKey}`, JSON.stringify(fresh), { expirationTtl: TTL_SECONDS });
|
||||||
|
}
|
||||||
|
|
||||||
export async function persistPausedRun(
|
export async function persistPausedRun(
|
||||||
kv: KvBinding,
|
kv: KvBinding,
|
||||||
taskId: string,
|
taskId: string,
|
||||||
state: PausedRunState,
|
state: PausedRunState,
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
|
// 1) 寫單一 task state
|
||||||
await kv.put(`${KEY_PREFIX}${taskId}`, JSON.stringify(state), { expirationTtl: TTL_SECONDS });
|
await kv.put(`${KEY_PREFIX}${taskId}`, JSON.stringify(state), { expirationTtl: TTL_SECONDS });
|
||||||
|
|
||||||
|
// 2) 維護 per-user index(讓 /executions/paused list 不靠 KV list 即時拿到)
|
||||||
|
if (state.api_key) {
|
||||||
|
const idx = await readIndex(kv, state.api_key);
|
||||||
|
// 去重(重複 paused 同 task_id 時覆蓋)
|
||||||
|
const filtered = idx.filter((e) => e.task_id !== taskId);
|
||||||
|
filtered.unshift({
|
||||||
|
task_id: taskId,
|
||||||
|
run_id: state.run_id,
|
||||||
|
paused_node_id: state.paused_node_id,
|
||||||
|
workflow_name: state.graph.name,
|
||||||
|
expires_at: state.expires_at,
|
||||||
|
persisted_at: Date.now(),
|
||||||
|
});
|
||||||
|
// 限 100 筆避免 index 無限長(超過捨棄最舊)
|
||||||
|
await writeIndex(kv, state.api_key, filtered.slice(0, 100));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function loadPausedRun(
|
export async function loadPausedRun(
|
||||||
@@ -58,6 +113,19 @@ export async function loadPausedRun(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 列某 api_key 當前 paused tasks。走 per-user index(強 consistent,無 KV list 延遲)
|
||||||
|
*/
|
||||||
|
export async function listPausedRunsByApiKey(
|
||||||
|
kv: KvBinding,
|
||||||
|
apiKey: string,
|
||||||
|
limit = 20,
|
||||||
|
): Promise<PausedIndexEntry[]> {
|
||||||
|
const idx = await readIndex(kv, apiKey);
|
||||||
|
const now = Date.now();
|
||||||
|
return idx.filter((e) => e.expires_at > now).slice(0, limit);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 原子讀+刪:避免同 task_id 重複 callback 重複執行下游
|
* 原子讀+刪:避免同 task_id 重複 callback 重複執行下游
|
||||||
* (CF KV 沒真原子操作,但 delete 失敗不影響 load 已成功)
|
* (CF KV 沒真原子操作,但 delete 失敗不影響 load 已成功)
|
||||||
@@ -71,6 +139,12 @@ export async function consumePausedRun(
|
|||||||
await kv.delete(`${KEY_PREFIX}${taskId}`).catch(() => {
|
await kv.delete(`${KEY_PREFIX}${taskId}`).catch(() => {
|
||||||
// delete 失敗不擋,最多就重複執行一次(接受)
|
// delete 失敗不擋,最多就重複執行一次(接受)
|
||||||
});
|
});
|
||||||
|
// 同步從 per-user index 移除
|
||||||
|
if (state.api_key) {
|
||||||
|
const idx = await readIndex(kv, state.api_key);
|
||||||
|
const filtered = idx.filter((e) => e.task_id !== taskId);
|
||||||
|
await writeIndex(kv, state.api_key, filtered).catch(() => {});
|
||||||
|
}
|
||||||
return state;
|
return state;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -12,18 +12,15 @@
|
|||||||
|
|
||||||
import { Hono } from 'hono';
|
import { Hono } from 'hono';
|
||||||
import type { Bindings } from '../types';
|
import type { Bindings } from '../types';
|
||||||
|
import { listPausedRunsByApiKey } from '../lib/paused-runs';
|
||||||
|
|
||||||
export const executionsRouter = new Hono<{ Bindings: Bindings }>();
|
export const executionsRouter = new Hono<{ Bindings: Bindings }>();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* GET /executions/paused — 列當前 api_key 下所有 paused workflow
|
* GET /executions/paused — 列當前 api_key 下所有 paused workflow
|
||||||
*
|
*
|
||||||
* 走 EXEC_CONTEXT KV `paused_run:*` prefix scan,過濾 state.api_key === 當前用戶。
|
* 走 per-user index `paused_idx:{api_key}`(單 KV get,強 consistent,無 KV list 延遲)
|
||||||
*
|
* 取代舊的 `paused_run:*` prefix scan(CF KV list 30-60 秒 eventual consistent)
|
||||||
* 回傳:[{ task_id, run_id, paused_node_id, workflow_name?, expires_at }]
|
|
||||||
*
|
|
||||||
* 注意:KV list 預設只回 keys,要 GET 每個值才知道 api_key — 上限 limit 設低
|
|
||||||
* 避免 N+1 query 過爆。實際生產要做 per-user index 取代 prefix scan。
|
|
||||||
*/
|
*/
|
||||||
executionsRouter.get('/executions/paused', async (c) => {
|
executionsRouter.get('/executions/paused', async (c) => {
|
||||||
const apiKey = c.req.header('X-Arcrun-API-Key');
|
const apiKey = c.req.header('X-Arcrun-API-Key');
|
||||||
@@ -39,40 +36,7 @@ executionsRouter.get('/executions/paused', async (c) => {
|
|||||||
const limitParam = c.req.query('limit');
|
const limitParam = c.req.query('limit');
|
||||||
const limit = Math.min(Math.max(parseInt(limitParam || '20', 10), 1), 100);
|
const limit = Math.min(Math.max(parseInt(limitParam || '20', 10), 1), 100);
|
||||||
|
|
||||||
const list = await c.env.EXEC_CONTEXT.list({ prefix: 'paused_run:', limit: 200 });
|
const paused = await listPausedRunsByApiKey(c.env.EXEC_CONTEXT, apiKey, limit);
|
||||||
const paused: Array<{
|
|
||||||
task_id: string;
|
|
||||||
run_id: string;
|
|
||||||
paused_node_id: string;
|
|
||||||
workflow_name?: string;
|
|
||||||
expires_at?: number;
|
|
||||||
}> = [];
|
|
||||||
|
|
||||||
for (const key of list.keys) {
|
|
||||||
if (paused.length >= limit) break;
|
|
||||||
const raw = await c.env.EXEC_CONTEXT.get(key.name);
|
|
||||||
if (!raw) continue;
|
|
||||||
try {
|
|
||||||
const state = JSON.parse(raw) as {
|
|
||||||
run_id: string;
|
|
||||||
paused_node_id: string;
|
|
||||||
api_key?: string;
|
|
||||||
expires_at?: number;
|
|
||||||
graph?: { name?: string };
|
|
||||||
};
|
|
||||||
if (state.api_key !== apiKey) continue; // 隔離租戶
|
|
||||||
const task_id = key.name.replace(/^paused_run:/, '');
|
|
||||||
paused.push({
|
|
||||||
task_id,
|
|
||||||
run_id: state.run_id,
|
|
||||||
paused_node_id: state.paused_node_id,
|
|
||||||
workflow_name: state.graph?.name,
|
|
||||||
expires_at: state.expires_at,
|
|
||||||
});
|
|
||||||
} catch {
|
|
||||||
// 損毀 state 跳過
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return c.json({
|
return c.json({
|
||||||
ok: true,
|
ok: true,
|
||||||
|
|||||||
Reference in New Issue
Block a user