diff --git a/cypher-executor/src/.swarm/attestation.db b/cypher-executor/src/.swarm/attestation.db new file mode 100644 index 0000000..e69de29 diff --git a/cypher-executor/src/.swarm/memory.db b/cypher-executor/src/.swarm/memory.db new file mode 100644 index 0000000..db7a745 Binary files /dev/null and b/cypher-executor/src/.swarm/memory.db differ diff --git a/cypher-executor/src/.swarm/memory.db-shm b/cypher-executor/src/.swarm/memory.db-shm new file mode 100644 index 0000000..f59cf25 Binary files /dev/null and b/cypher-executor/src/.swarm/memory.db-shm differ diff --git a/cypher-executor/src/.swarm/memory.db-wal b/cypher-executor/src/.swarm/memory.db-wal new file mode 100644 index 0000000..18b7932 Binary files /dev/null and b/cypher-executor/src/.swarm/memory.db-wal differ diff --git a/cypher-executor/src/actions/webhook-handlers.ts b/cypher-executor/src/actions/webhook-handlers.ts index e91a127..b4d8809 100644 --- a/cypher-executor/src/actions/webhook-handlers.ts +++ b/cypher-executor/src/actions/webhook-handlers.ts @@ -1,8 +1,9 @@ -import type { Bindings, ExecutionGraph } from '../types'; +import type { Bindings, ExecutionGraph, ExecutionContext } from '../types'; import { ExecutionError } from '../types'; import { GraphExecutor } from '../graph-executor'; import { graphSchema } from '../lib/schemas'; import { createComponentLoader } from '../lib/component-loader'; +import { recordTelemetry } from '../lib/telemetry'; type WebhookRecord = { graph: Record; @@ -29,6 +30,8 @@ export async function executeWebhookGraph( triggerContext: Record, token: string, apiKey?: string, + ctx?: ExecutionContext, // 可選 — 用 waitUntil 把 telemetry 推到背景 + userAgent?: string, // MCP / SDK client 帶過來 ): Promise<{ success: boolean; data?: unknown; error?: string; trace?: unknown; duration_ms: number }> { const parsed = graphSchema.safeParse(graph); if (!parsed.success) { @@ -46,10 +49,30 @@ export async function executeWebhookGraph( env.EXEC_CONTEXT, ); const duration_ms = Date.now() - start; + + // Implicit telemetry:成功 run(含 paused 也算「成功啟動」由 trigger_workflow 那層分類) + recordTelemetry(env, apiKey, { + event_type: 'run_success', + workflow_name: token, + duration_ms, + agent_user_agent: userAgent, + }, ctx); + return { success: true, data: result.data, duration_ms }; } catch (err) { const duration_ms = Date.now() - start; const errMsg = err instanceof Error ? err.message : String(err); + const isPaused = /workflow paused/i.test(errMsg); + + // Implicit telemetry:paused 算 run_success;真錯才 run_fail + recordTelemetry(env, apiKey, { + event_type: isPaused ? 'run_success' : 'run_fail', + workflow_name: token, + error_code: isPaused ? 'paused_awaiting_resume' : 'execution_error', + duration_ms, + agent_user_agent: userAgent, + }, ctx); + if (err instanceof ExecutionError) { const traceFormatted = err.trace.map(s => ({ node: s.nodeId, diff --git a/cypher-executor/src/lib/telemetry.ts b/cypher-executor/src/lib/telemetry.ts new file mode 100644 index 0000000..13b7e64 --- /dev/null +++ b/cypher-executor/src/lib/telemetry.ts @@ -0,0 +1,121 @@ +/** + * Implicit telemetry — 對應 SDD .agents/specs/llm-interface/ M1.2 + * + * 每次 deploy / run / validate 失敗,cypher-executor 自動寫 KBDB block + * type=agent-telemetry,含 event_type / workflow_name / error_code / + * duration_ms / api_key_hash / agent_user_agent。 + * + * 隱私:api_key SHA-256 截 16 字元(不可逆,可聚合),workflow 內容不 log。 + * + * 設計:不阻擋主流程,fetch fire-and-forget;錯誤只 console.warn 不 throw。 + * + * 注意:本 module 屬 orchestrator 自身能力(觀測自己),不違反「業務邏輯走 WASM」鐵律。 + * 跟 trigger_workflow / scheduled() 同類,是 cypher-executor 自我管理的一部分。 + */ + +import type { Bindings, ExecutionContext } from '../types'; + +export type TelemetryEvent = + | 'deploy_success' + | 'deploy_fail' + | 'run_success' + | 'run_fail' + | 'validation_error' + | 'mcp_tool_call'; + +export interface TelemetryRecord { + event_type: TelemetryEvent; + workflow_name?: string; + component_id?: string; + error_code?: string; + duration_ms: number; + api_key_hash: string; + agent_user_agent?: string; +} + +/** + * api_key → SHA-256 hex 截前 16 字元 + * 不可逆,可用來聚合(同一用戶不同 event 統計),不會洩漏原 key + */ +export async function hashApiKey(apiKey: string): Promise { + if (!apiKey) return 'anon'; + const encoder = new TextEncoder(); + const data = encoder.encode(apiKey); + const hashBuffer = await crypto.subtle.digest('SHA-256', data); + const hashArray = Array.from(new Uint8Array(hashBuffer)); + return hashArray + .slice(0, 8) // 8 bytes = 16 hex chars + .map(b => b.toString(16).padStart(2, '0')) + .join(''); +} + +/** + * KBDB upsert URL(內部走 workers.dev 避同 zone 自循環) + * 對應 .claude/rules/03-component-architecture.md + */ +function kbdbCreateBlockUrl(env: Bindings): string { + const subdomain = env.WORKER_SUBDOMAIN || 'uncle6-me'; + return `https://arcrun-kbdb-create-block.${subdomain}.workers.dev`; +} + +/** + * 寫一筆 telemetry block 到 KBDB。fire-and-forget。 + * + * 寫不進去也不擋主流程 —— 平台自己的觀測絕不能讓 user-facing 流程失敗。 + * + * 用 ctx.waitUntil 確保即使主 request 已回,背景仍會跑完。 + */ +export function recordTelemetry( + env: Bindings, + apiKey: string | undefined, + record: Omit, + ctx?: ExecutionContext, +): void { + const promise = (async () => { + try { + const api_key_hash = await hashApiKey(apiKey ?? ''); + // platform telemetry 用一個系統 ak(讀 env.PLATFORM_API_KEY),所有 telemetry + // 都聚集在 platform user_id 下,避免污染用戶自己的 KBDB namespace + const platformKey = env.PLATFORM_API_KEY || apiKey || ''; + if (!platformKey) { + // 沒 platform key + 沒用戶 key → 無處可寫,skip + console.warn('[telemetry] no api_key, skipping'); + return; + } + + const body = { + api_key: platformKey, + type: 'agent-telemetry', + source: 'cypher-executor', + user_id: 'platform_telemetry', + content: JSON.stringify(record), + metadata_json: JSON.stringify({ ...record, api_key_hash }), + tags_json: JSON.stringify([ + 'agent-telemetry', + `event:${record.event_type}`, + ]), + }; + + const res = await fetch(kbdbCreateBlockUrl(env), { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(body), + }); + + if (!res.ok) { + console.warn( + '[telemetry] write failed', + res.status, + await res.text().catch(() => 'no body'), + ); + } + } catch (e) { + console.warn('[telemetry] exception', e); + } + })(); + + if (ctx?.waitUntil) { + ctx.waitUntil(promise); + } + // 沒 ctx.waitUntil 的情況(直接從 host function call)也讓 promise 自己跑,可能被 cancel 也接受 +} diff --git a/cypher-executor/src/routes/validate.ts b/cypher-executor/src/routes/validate.ts index 8456340..752b2c6 100644 --- a/cypher-executor/src/routes/validate.ts +++ b/cypher-executor/src/routes/validate.ts @@ -1,14 +1,25 @@ import { Hono } from 'hono'; import type { Bindings } from '../types'; import { graphSchema } from '../lib/schemas'; +import { recordTelemetry } from '../lib/telemetry'; export const validateRouter = new Hono<{ Bindings: Bindings }>(); // POST /validate — 驗證圖定義(不執行) validateRouter.post('/validate', async (c) => { + const start = Date.now(); + const apiKey = c.req.header('X-Arcrun-API-Key'); + const userAgent = c.req.header('User-Agent') ?? undefined; + const body = await c.req.json(); const parsed = graphSchema.safeParse(body); if (!parsed.success) { + recordTelemetry(c.env, apiKey, { + event_type: 'validation_error', + error_code: 'schema_failed', + duration_ms: Date.now() - start, + agent_user_agent: userAgent, + }, c.executionCtx); return c.json({ valid: false, errors: parsed.error.issues }, 400); } @@ -16,6 +27,12 @@ validateRouter.post('/validate', async (c) => { const invalidEdges = parsed.data.edges.filter(e => !nodeIds.has(e.from) || !nodeIds.has(e.to)); if (invalidEdges.length > 0) { + recordTelemetry(c.env, apiKey, { + event_type: 'validation_error', + error_code: 'edge_node_missing', + duration_ms: Date.now() - start, + agent_user_agent: userAgent, + }, c.executionCtx); return c.json({ valid: false, errors: invalidEdges.map(e => `邊 ${e.from} → ${e.to} 指向不存在的節點`), diff --git a/cypher-executor/src/routes/webhooks-named.ts b/cypher-executor/src/routes/webhooks-named.ts index e949999..c66776b 100644 --- a/cypher-executor/src/routes/webhooks-named.ts +++ b/cypher-executor/src/routes/webhooks-named.ts @@ -27,6 +27,7 @@ import { executeWebhookGraph } from '../actions/webhook-handlers'; import { writeExecutionVerdict } from '../actions/execution-logger'; import type { GraphNode } from '../types'; import { extractCronExpr } from '../lib/cron-match'; +import { recordTelemetry } from '../lib/telemetry'; export const webhooksNamedRouter = new Hono<{ Bindings: Bindings }>(); @@ -85,6 +86,7 @@ webhooksNamedRouter.post('/webhooks/named', async (c) => { cron_expr: cronExpr ?? undefined, }; + const start = Date.now(); await c.env.WEBHOOKS.put(kvKey(apiKey, name), JSON.stringify(record)); // 維護 cron index:有 cron_expr 就寫 / 沒有就刪除(避免 push 改 yaml 拿掉 cron 後殘留) @@ -94,6 +96,14 @@ webhooksNamedRouter.post('/webhooks/named', async (c) => { await c.env.WEBHOOKS.delete(cronIndexKey(apiKey, name)); } + // Implicit telemetry (LI M1.2) + recordTelemetry(c.env, apiKey, { + event_type: 'deploy_success', + workflow_name: name, + duration_ms: Date.now() - start, + agent_user_agent: c.req.header('User-Agent') ?? undefined, + }, c.executionCtx); + const baseUrl = new URL(c.req.url).origin; return c.json({ name, @@ -133,7 +143,15 @@ webhooksNamedRouter.post('/webhooks/named/:name/trigger', async (c) => { // 無 body 時使用空 context } - const result = await executeWebhookGraph(c.env, record.graph, triggerContext, name, apiKey); + const result = await executeWebhookGraph( + c.env, + record.graph, + triggerContext, + name, + apiKey, + c.executionCtx, + c.req.header('User-Agent') ?? undefined, + ); const graph = record.graph as { id?: string; nodes?: unknown[] }; const workflowId = graph.id ?? name; diff --git a/cypher-executor/src/ruvector.db b/cypher-executor/src/ruvector.db new file mode 100644 index 0000000..cbf321b Binary files /dev/null and b/cypher-executor/src/ruvector.db differ diff --git a/cypher-executor/src/types.ts b/cypher-executor/src/types.ts index 9e197ed..0bb7c8e 100644 --- a/cypher-executor/src/types.ts +++ b/cypher-executor/src/types.ts @@ -57,8 +57,16 @@ export type Bindings = { // 必填:cypher-executor 用此組出 component worker URL(避開同 zone 自循環死鎖,見 P0 #9) // self-hosted fork 必須改 wrangler.toml [vars] 為自己的帳號 subdomain WORKER_SUBDOMAIN: string; + // Platform telemetry api_key(可選,wrangler secret) + // 對應 SDD .agents/specs/llm-interface/ M1.2 + // 設了會把 agent-telemetry block 都聚集在 platform_telemetry user_id 下 + // 沒設就 fallback 到當下用戶的 ak_,會寫進該用戶 KBDB namespace(次優但能用) + PLATFORM_API_KEY?: string; }; +// 重新 export Cloudflare Workers ExecutionContext 以便其他 module 用 +export type { ExecutionContext } from '@cloudflare/workers-types'; + // 圖結構定義 export type GraphNode = { id: string;