feat(cypher-executor): implicit telemetry (LI SDD M1.2)

對應 .agents/specs/llm-interface/ Milestone 1.2。

新增 lib/telemetry.ts:
- hashApiKey(): SHA-256 截 16 hex 字元(不可逆,可聚合)
- recordTelemetry(): fetch fire-and-forget 寫 KBDB type=agent-telemetry block
- 設計:不阻擋主流程,錯誤 console.warn 不 throw
- 用 ctx.waitUntil 確保即使主 request 已回,背景仍會跑完

寫入點 3 處:
1. routes/webhooks-named.ts POST /webhooks/named (deploy) → deploy_success
2. routes/webhooks-named.ts POST /webhooks/named/:name/trigger →
   executeWebhookGraph 帶 ctx + userAgent,內部記 run_success / run_fail
3. routes/validate.ts POST /validate → validation_error (含 schema_failed / edge_node_missing)

executeWebhookGraph 簽名擴張:可選 ctx + userAgent,舊 caller (scheduled /
trigger_workflow / anonymous webhook) 不傳也 OK(telemetry 仍寫但無 ctx 加持)。

paused (workflow 因 claude_api 等等 callback resume) 算 run_success,
不污染 fail metric。

types.ts: 加 PLATFORM_API_KEY env (可選) + re-export ExecutionContext

不違反「業務邏輯走 WASM」鐵律:telemetry 是 orchestrator 觀測自身的能力,
跟 trigger_workflow / scheduled() 同類。

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-16 15:33:35 +08:00
parent 56973558e2
commit d04e34ea35
10 changed files with 189 additions and 2 deletions
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -1,8 +1,9 @@
import type { Bindings, ExecutionGraph } from '../types'; import type { Bindings, ExecutionGraph, ExecutionContext } from '../types';
import { ExecutionError } from '../types'; import { ExecutionError } from '../types';
import { GraphExecutor } from '../graph-executor'; import { GraphExecutor } from '../graph-executor';
import { graphSchema } from '../lib/schemas'; import { graphSchema } from '../lib/schemas';
import { createComponentLoader } from '../lib/component-loader'; import { createComponentLoader } from '../lib/component-loader';
import { recordTelemetry } from '../lib/telemetry';
type WebhookRecord = { type WebhookRecord = {
graph: Record<string, unknown>; graph: Record<string, unknown>;
@@ -29,6 +30,8 @@ export async function executeWebhookGraph(
triggerContext: Record<string, unknown>, triggerContext: Record<string, unknown>,
token: string, token: string,
apiKey?: string, apiKey?: string,
ctx?: ExecutionContext, // 可選 — 用 waitUntil 把 telemetry 推到背景
userAgent?: string, // MCP / SDK client 帶過來
): Promise<{ success: boolean; data?: unknown; error?: string; trace?: unknown; duration_ms: number }> { ): Promise<{ success: boolean; data?: unknown; error?: string; trace?: unknown; duration_ms: number }> {
const parsed = graphSchema.safeParse(graph); const parsed = graphSchema.safeParse(graph);
if (!parsed.success) { if (!parsed.success) {
@@ -46,10 +49,30 @@ export async function executeWebhookGraph(
env.EXEC_CONTEXT, env.EXEC_CONTEXT,
); );
const duration_ms = Date.now() - start; const duration_ms = Date.now() - start;
// Implicit telemetry:成功 run(含 paused 也算「成功啟動」由 trigger_workflow 那層分類)
recordTelemetry(env, apiKey, {
event_type: 'run_success',
workflow_name: token,
duration_ms,
agent_user_agent: userAgent,
}, ctx);
return { success: true, data: result.data, duration_ms }; return { success: true, data: result.data, duration_ms };
} catch (err) { } catch (err) {
const duration_ms = Date.now() - start; const duration_ms = Date.now() - start;
const errMsg = err instanceof Error ? err.message : String(err); const errMsg = err instanceof Error ? err.message : String(err);
const isPaused = /workflow paused/i.test(errMsg);
// Implicit telemetrypaused 算 run_success;真錯才 run_fail
recordTelemetry(env, apiKey, {
event_type: isPaused ? 'run_success' : 'run_fail',
workflow_name: token,
error_code: isPaused ? 'paused_awaiting_resume' : 'execution_error',
duration_ms,
agent_user_agent: userAgent,
}, ctx);
if (err instanceof ExecutionError) { if (err instanceof ExecutionError) {
const traceFormatted = err.trace.map(s => ({ const traceFormatted = err.trace.map(s => ({
node: s.nodeId, node: s.nodeId,
+121
View File
@@ -0,0 +1,121 @@
/**
* Implicit telemetry — 對應 SDD .agents/specs/llm-interface/ M1.2
*
* 每次 deploy / run / validate 失敗,cypher-executor 自動寫 KBDB block
* type=agent-telemetry,含 event_type / workflow_name / error_code /
* duration_ms / api_key_hash / agent_user_agent。
*
* 隱私:api_key SHA-256 截 16 字元(不可逆,可聚合),workflow 內容不 log。
*
* 設計:不阻擋主流程,fetch fire-and-forget;錯誤只 console.warn 不 throw。
*
* 注意:本 module 屬 orchestrator 自身能力(觀測自己),不違反「業務邏輯走 WASM」鐵律。
* 跟 trigger_workflow / scheduled() 同類,是 cypher-executor 自我管理的一部分。
*/
import type { Bindings, ExecutionContext } from '../types';
export type TelemetryEvent =
| 'deploy_success'
| 'deploy_fail'
| 'run_success'
| 'run_fail'
| 'validation_error'
| 'mcp_tool_call';
export interface TelemetryRecord {
event_type: TelemetryEvent;
workflow_name?: string;
component_id?: string;
error_code?: string;
duration_ms: number;
api_key_hash: string;
agent_user_agent?: string;
}
/**
* api_key → SHA-256 hex 截前 16 字元
* 不可逆,可用來聚合(同一用戶不同 event 統計),不會洩漏原 key
*/
export async function hashApiKey(apiKey: string): Promise<string> {
if (!apiKey) return 'anon';
const encoder = new TextEncoder();
const data = encoder.encode(apiKey);
const hashBuffer = await crypto.subtle.digest('SHA-256', data);
const hashArray = Array.from(new Uint8Array(hashBuffer));
return hashArray
.slice(0, 8) // 8 bytes = 16 hex chars
.map(b => b.toString(16).padStart(2, '0'))
.join('');
}
/**
* KBDB upsert URL(內部走 workers.dev 避同 zone 自循環)
* 對應 .claude/rules/03-component-architecture.md
*/
function kbdbCreateBlockUrl(env: Bindings): string {
const subdomain = env.WORKER_SUBDOMAIN || 'uncle6-me';
return `https://arcrun-kbdb-create-block.${subdomain}.workers.dev`;
}
/**
* 寫一筆 telemetry block 到 KBDB。fire-and-forget。
*
* 寫不進去也不擋主流程 —— 平台自己的觀測絕不能讓 user-facing 流程失敗。
*
* 用 ctx.waitUntil 確保即使主 request 已回,背景仍會跑完。
*/
export function recordTelemetry(
env: Bindings,
apiKey: string | undefined,
record: Omit<TelemetryRecord, 'api_key_hash'>,
ctx?: ExecutionContext,
): void {
const promise = (async () => {
try {
const api_key_hash = await hashApiKey(apiKey ?? '');
// platform telemetry 用一個系統 ak(讀 env.PLATFORM_API_KEY),所有 telemetry
// 都聚集在 platform user_id 下,避免污染用戶自己的 KBDB namespace
const platformKey = env.PLATFORM_API_KEY || apiKey || '';
if (!platformKey) {
// 沒 platform key + 沒用戶 key → 無處可寫,skip
console.warn('[telemetry] no api_key, skipping');
return;
}
const body = {
api_key: platformKey,
type: 'agent-telemetry',
source: 'cypher-executor',
user_id: 'platform_telemetry',
content: JSON.stringify(record),
metadata_json: JSON.stringify({ ...record, api_key_hash }),
tags_json: JSON.stringify([
'agent-telemetry',
`event:${record.event_type}`,
]),
};
const res = await fetch(kbdbCreateBlockUrl(env), {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(body),
});
if (!res.ok) {
console.warn(
'[telemetry] write failed',
res.status,
await res.text().catch(() => 'no body'),
);
}
} catch (e) {
console.warn('[telemetry] exception', e);
}
})();
if (ctx?.waitUntil) {
ctx.waitUntil(promise);
}
// 沒 ctx.waitUntil 的情況(直接從 host function call)也讓 promise 自己跑,可能被 cancel 也接受
}
+17
View File
@@ -1,14 +1,25 @@
import { Hono } from 'hono'; import { Hono } from 'hono';
import type { Bindings } from '../types'; import type { Bindings } from '../types';
import { graphSchema } from '../lib/schemas'; import { graphSchema } from '../lib/schemas';
import { recordTelemetry } from '../lib/telemetry';
export const validateRouter = new Hono<{ Bindings: Bindings }>(); export const validateRouter = new Hono<{ Bindings: Bindings }>();
// POST /validate — 驗證圖定義(不執行) // POST /validate — 驗證圖定義(不執行)
validateRouter.post('/validate', async (c) => { validateRouter.post('/validate', async (c) => {
const start = Date.now();
const apiKey = c.req.header('X-Arcrun-API-Key');
const userAgent = c.req.header('User-Agent') ?? undefined;
const body = await c.req.json(); const body = await c.req.json();
const parsed = graphSchema.safeParse(body); const parsed = graphSchema.safeParse(body);
if (!parsed.success) { if (!parsed.success) {
recordTelemetry(c.env, apiKey, {
event_type: 'validation_error',
error_code: 'schema_failed',
duration_ms: Date.now() - start,
agent_user_agent: userAgent,
}, c.executionCtx);
return c.json({ valid: false, errors: parsed.error.issues }, 400); return c.json({ valid: false, errors: parsed.error.issues }, 400);
} }
@@ -16,6 +27,12 @@ validateRouter.post('/validate', async (c) => {
const invalidEdges = parsed.data.edges.filter(e => !nodeIds.has(e.from) || !nodeIds.has(e.to)); const invalidEdges = parsed.data.edges.filter(e => !nodeIds.has(e.from) || !nodeIds.has(e.to));
if (invalidEdges.length > 0) { if (invalidEdges.length > 0) {
recordTelemetry(c.env, apiKey, {
event_type: 'validation_error',
error_code: 'edge_node_missing',
duration_ms: Date.now() - start,
agent_user_agent: userAgent,
}, c.executionCtx);
return c.json({ return c.json({
valid: false, valid: false,
errors: invalidEdges.map(e => `${e.from}${e.to} 指向不存在的節點`), errors: invalidEdges.map(e => `${e.from}${e.to} 指向不存在的節點`),
+19 -1
View File
@@ -27,6 +27,7 @@ import { executeWebhookGraph } from '../actions/webhook-handlers';
import { writeExecutionVerdict } from '../actions/execution-logger'; import { writeExecutionVerdict } from '../actions/execution-logger';
import type { GraphNode } from '../types'; import type { GraphNode } from '../types';
import { extractCronExpr } from '../lib/cron-match'; import { extractCronExpr } from '../lib/cron-match';
import { recordTelemetry } from '../lib/telemetry';
export const webhooksNamedRouter = new Hono<{ Bindings: Bindings }>(); export const webhooksNamedRouter = new Hono<{ Bindings: Bindings }>();
@@ -85,6 +86,7 @@ webhooksNamedRouter.post('/webhooks/named', async (c) => {
cron_expr: cronExpr ?? undefined, cron_expr: cronExpr ?? undefined,
}; };
const start = Date.now();
await c.env.WEBHOOKS.put(kvKey(apiKey, name), JSON.stringify(record)); await c.env.WEBHOOKS.put(kvKey(apiKey, name), JSON.stringify(record));
// 維護 cron index:有 cron_expr 就寫 / 沒有就刪除(避免 push 改 yaml 拿掉 cron 後殘留) // 維護 cron index:有 cron_expr 就寫 / 沒有就刪除(避免 push 改 yaml 拿掉 cron 後殘留)
@@ -94,6 +96,14 @@ webhooksNamedRouter.post('/webhooks/named', async (c) => {
await c.env.WEBHOOKS.delete(cronIndexKey(apiKey, name)); await c.env.WEBHOOKS.delete(cronIndexKey(apiKey, name));
} }
// Implicit telemetry (LI M1.2)
recordTelemetry(c.env, apiKey, {
event_type: 'deploy_success',
workflow_name: name,
duration_ms: Date.now() - start,
agent_user_agent: c.req.header('User-Agent') ?? undefined,
}, c.executionCtx);
const baseUrl = new URL(c.req.url).origin; const baseUrl = new URL(c.req.url).origin;
return c.json({ return c.json({
name, name,
@@ -133,7 +143,15 @@ webhooksNamedRouter.post('/webhooks/named/:name/trigger', async (c) => {
// 無 body 時使用空 context // 無 body 時使用空 context
} }
const result = await executeWebhookGraph(c.env, record.graph, triggerContext, name, apiKey); const result = await executeWebhookGraph(
c.env,
record.graph,
triggerContext,
name,
apiKey,
c.executionCtx,
c.req.header('User-Agent') ?? undefined,
);
const graph = record.graph as { id?: string; nodes?: unknown[] }; const graph = record.graph as { id?: string; nodes?: unknown[] };
const workflowId = graph.id ?? name; const workflowId = graph.id ?? name;
Binary file not shown.
+8
View File
@@ -57,8 +57,16 @@ export type Bindings = {
// 必填:cypher-executor 用此組出 component worker URL(避開同 zone 自循環死鎖,見 P0 #9) // 必填:cypher-executor 用此組出 component worker URL(避開同 zone 自循環死鎖,見 P0 #9)
// self-hosted fork 必須改 wrangler.toml [vars] 為自己的帳號 subdomain // self-hosted fork 必須改 wrangler.toml [vars] 為自己的帳號 subdomain
WORKER_SUBDOMAIN: string; WORKER_SUBDOMAIN: string;
// Platform telemetry api_key(可選,wrangler secret
// 對應 SDD .agents/specs/llm-interface/ M1.2
// 設了會把 agent-telemetry block 都聚集在 platform_telemetry user_id 下
// 沒設就 fallback 到當下用戶的 ak_,會寫進該用戶 KBDB namespace(次優但能用)
PLATFORM_API_KEY?: string;
}; };
// 重新 export Cloudflare Workers ExecutionContext 以便其他 module 用
export type { ExecutionContext } from '@cloudflare/workers-types';
// 圖結構定義 // 圖結構定義
export type GraphNode = { export type GraphNode = {
id: string; id: string;