feat(arcrun): implement arcrun MVP — open-source AI workflow engine

Phase 1-5 complete per .agents/specs/u6u-core-mvp/:

**Phase 1 — Cherry-pick & cleanup**
- Create arcrun/ from cypher-executor, credentials, builtins, registry
- Remove 9 InkStone Service Bindings (KBDB, REGISTRY, CLINIC_*, AICEO, MINI_ME)
- Rewrite component-loader: 3-layer (builtin → WASM_BUCKET R2 → error)
- Remove autoPublishMissing.ts, proxy.ts (AICEO), execution-logger.ts (KBDB)
- Clean all KV namespace IDs and InkStone internal URLs from config files

**Phase 2 — contract.yaml completeness**
- Add credentials_required to gmail, google_sheets, telegram, line_notify
- Add config_example to all 21 components with annotated field descriptions

**Phase 3 — Credential injection**
- Add credential-injector.ts: AES-GCM decrypt from CREDENTIALS_KV
- Integrate into GraphExecutor before WASM execution
- Structured errors with repair instructions when credential missing

**Phase 4 — CLI (acr)**
- cli/package.json: arcrun package, bin: acr, deps: commander/js-yaml/chalk/ora
- 8 commands: init, creds push, push, run, validate, parts, list, logs
- Standard mode: writes directly to user's CF KV via CF REST API
- acr init: interactive setup with arcrun.dev API Key registration

**Phase 5 — Open source release prep**
- README.md: 5-minute quickstart, component table, workflow YAML syntax
- CONTRIBUTING.md: TinyGo dev env, component scaffolding, submission flow
- Security audit: no InkStone internal URLs/IDs in committed files
- .gitignore: exclude credentials.yaml, .wrangler, *.wasm

https://claude.ai/code/session_01BnCdSLVH8tUed9VrrPavgT
This commit is contained in:
Claude
2026-04-16 04:06:25 +00:00
commit 2707fca32b
155 changed files with 17413 additions and 0 deletions
+21
View File
@@ -0,0 +1,21 @@
{
"name": "arcrun-cypher-executor",
"version": "0.1.0",
"private": true,
"scripts": {
"dev": "wrangler dev",
"deploy": "wrangler deploy",
"test": "vitest run"
},
"dependencies": {
"@hono/zod-openapi": "^1.2.4",
"hono": "^4.7.0",
"zod": "~3.23.8"
},
"devDependencies": {
"@cloudflare/vitest-pool-workers": "^0.8.0",
"@cloudflare/workers-types": "^4.20250219.0",
"typescript": "^5.7.0",
"vitest": "^3.1.0"
}
}
+2007
View File
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,160 @@
/**
* Credential Injector
*
* 在 WASM 零件執行前,從 CREDENTIALS_KV 讀取加密 credential
* AES-GCM 解密後注入到 input 的對應欄位(inject_as)。
*
* 用戶的 workflow.yaml config 中不需要也不應該包含明文 token。
*
* 設計原則:
* - contract.yaml 的 credentials_required 宣告需要哪個 credential
* - CREDENTIALS_KV 存放 AES-GCM 加密後的 credentialkey = cred:{name}
* - 注入發生在 WASM 執行前,不修改 WEBHOOKS KV 中儲存的 workflow 定義
*/
import type { Bindings } from '../types';
export interface CredentialRequirement {
key: string; // CREDENTIALS_KV 的 key(如 gmail_token
type: string; // token 類型(如 google_oauth
description: string; // 說明
inject_as: string; // 注入到 input 的欄位名稱(如 access_token
}
/**
* 讀取並解析零件的 contract.yaml(從 WASM_BUCKET
* 回傳 credentials_required 陣列,若不存在則回傳空陣列
*/
async function loadCredentialsRequired(
componentId: string,
wasmBucket: R2Bucket,
): Promise<CredentialRequirement[]> {
const contractKey = `${componentId}/component.contract.yaml`;
const obj = await wasmBucket.get(contractKey);
if (!obj) return [];
const yamlText = await obj.text();
return parseCredentialsRequired(yamlText);
}
/**
* 從 YAML 文字解析 credentials_required 欄位
* 使用簡單的正規表達式解析(避免引入 js-yaml 依賴)
*/
function parseCredentialsRequired(yaml: string): CredentialRequirement[] {
const credsSection = yaml.match(/credentials_required:\s*([\s\S]*?)(?=\n\w|\n#|$)/);
if (!credsSection) return [];
const items: CredentialRequirement[] = [];
const blockText = credsSection[1];
// 解析 " - key: xxx" 開頭的項目
const itemMatches = blockText.split(/\n - /).slice(1);
for (const item of itemMatches) {
const key = item.match(/key:\s*["']?([^"'\n]+)["']?/)?.[1]?.trim();
const type = item.match(/type:\s*["']?([^"'\n]+)["']?/)?.[1]?.trim();
const description = item.match(/description:\s*["']?([^"'\n]+)["']?/)?.[1]?.trim() ?? '';
const inject_as = item.match(/inject_as:\s*["']?([^"'\n]+)["']?/)?.[1]?.trim();
if (key && type && inject_as) {
items.push({ key, type, description, inject_as });
}
}
return items;
}
/**
* AES-GCM 解密(與 credentials Worker 的加密邏輯對應)
* CREDENTIALS_KV 儲存格式:{ encrypted: base64, iv: base64 }
*/
async function decryptCredential(encryptedJson: string, encryptionKey: string): Promise<string> {
const { encrypted, iv } = JSON.parse(encryptedJson) as { encrypted: string; iv: string };
// 將 hex-encoded 256-bit key 轉為 CryptoKey
const keyBytes = hexToUint8Array(encryptionKey);
const cryptoKey = await crypto.subtle.importKey(
'raw',
keyBytes,
{ name: 'AES-GCM' },
false,
['decrypt'],
);
const ivBytes = base64ToUint8Array(iv);
const cipherBytes = base64ToUint8Array(encrypted);
const decrypted = await crypto.subtle.decrypt(
{ name: 'AES-GCM', iv: ivBytes },
cryptoKey,
cipherBytes,
);
return new TextDecoder().decode(decrypted);
}
function hexToUint8Array(hex: string): Uint8Array {
const bytes = new Uint8Array(hex.length / 2);
for (let i = 0; i < hex.length; i += 2) {
bytes[i / 2] = parseInt(hex.slice(i, i + 2), 16);
}
return bytes;
}
function base64ToUint8Array(b64: string): Uint8Array {
const binary = atob(b64);
const bytes = new Uint8Array(binary.length);
for (let i = 0; i < binary.length; i++) {
bytes[i] = binary.charCodeAt(i);
}
return bytes;
}
/**
* 執行 credential 注入
*
* @param componentId - 零件 canonical_id
* @param input - 節點的原始 input(來自 workflow config
* @param env - Cloudflare Worker Bindings
* @returns 注入 credential 後的 input
*
* @throws 若 credential 不存在,拋出結構化錯誤(含 key 名稱與修復步驟)
*/
export async function injectCredentials(
componentId: string,
input: Record<string, unknown>,
env: Bindings,
): Promise<Record<string, unknown>> {
// 讀取 contract.yaml 中的 credentials_required
const required = await loadCredentialsRequired(componentId, env.WASM_BUCKET);
if (required.length === 0) return input;
const enriched = { ...input };
for (const cred of required) {
const kvKey = `cred:${cred.key}`;
const record = await env.CREDENTIALS_KV.get(kvKey);
if (!record) {
throw new Error(
`缺少 credential${cred.key}${cred.description}\n` +
`修復步驟:\n` +
` 1. 在 credentials.yaml 中加入:\n` +
` ${cred.key}: "your-${cred.type}-token"\n` +
` 2. 執行:acr creds push`
);
}
try {
const decrypted = await decryptCredential(record, env.ENCRYPTION_KEY);
enriched[cred.inject_as] = decrypted;
} catch (e) {
throw new Error(
`credential "${cred.key}" 解密失敗:${e instanceof Error ? e.message : String(e)}\n` +
`修復步驟:重新執行 acr creds push 上傳正確的 credential。`
);
}
}
return enriched;
}
@@ -0,0 +1,112 @@
import type { Bindings, ExecutionGraph } from '../types';
import { ExecutionError } from '../types';
import { GraphExecutor } from '../graph-executor';
import { graphSchema } from '../lib/schemas';
import { createComponentLoader } from '../lib/component-loader';
import { writeEvaluation, updateComponentStats } from './execution-evaluator';
import { parseTriplets } from './triplet-parser';
import { searchNodes } from './search-nodes';
import { buildExecutionGraph } from './graph-builder';
export async function handleCypherSearch(
triplets: unknown[],
env: Bindings,
): Promise<{ nodes: Record<string, unknown>; cypher: unknown; missing: string[] }> {
const parsed = parseTriplets(triplets);
if (!parsed) {
throw new Error('無法解析任何節點');
}
const { nodeResults, missingNodes } = await searchNodes(parsed, env.WASM_BUCKET);
if (missingNodes.length > 0) {
return { nodes: nodeResults, cypher: null, missing: missingNodes };
}
const graph = buildExecutionGraph(parsed, nodeResults, 'cypher-search-result', 'Cypher Search Result');
return { nodes: nodeResults, cypher: { nodes: graph.nodes, edges: graph.edges }, missing: [] };
}
export async function handleCypherExecute(
triplets: unknown[],
context: Record<string, unknown> | undefined,
graphId: string,
graphName: string,
env: Bindings,
waitUntil: (promise: Promise<void>) => void,
): Promise<{ success: boolean; data?: unknown; error?: string; trace?: unknown; duration_ms: number; graph?: ExecutionGraph }> {
const parsed = parseTriplets(triplets as unknown[]);
if (!parsed) {
throw new Error('無法解析任何節點');
}
const { nodeResults, missingNodes } = await searchNodes(parsed, env.WASM_BUCKET);
if (missingNodes.length > 0) {
throw new Error(
`以下零件不存在於 WASM_BUCKET${missingNodes.join(', ')}\n` +
`修復:執行 acr parts 查看可用零件清單,或執行 acr validate <workflow.yaml> 進行完整驗證。`
);
}
const graph = buildExecutionGraph(parsed, nodeResults, graphId, graphName);
const parseResult = graphSchema.safeParse(graph);
if (!parseResult.success) {
throw new Error('圖定義產生失敗');
}
const loader = createComponentLoader(env);
const executor = new GraphExecutor(loader, undefined, env);
const start = Date.now();
try {
const result = await executor.execute(parseResult.data as ExecutionGraph, context ?? {}, env.EXEC_CONTEXT);
const duration_ms = Date.now() - start;
// 非同步記錄統計(Phase 7 補充 analytics,目前為 no-op
const componentId = graph.nodes.find(n => n.componentId)?.componentId ?? graphId;
const runId = `${graphId}-${Date.now()}`;
waitUntil(writeEvaluation(env, {
run_id: runId,
workflow_id: graphId,
component_id: componentId,
verdict: 'success',
duration_ms,
evaluated_at: Date.now(),
}));
waitUntil(updateComponentStats(env, componentId, 'success', duration_ms));
return { success: true, data: result.data, trace: result.trace, duration_ms, graph };
} catch (err) {
const duration_ms = Date.now() - start;
const errMsg = err instanceof Error ? err.message : String(err);
const componentId = graph.nodes.find(n => n.componentId)?.componentId ?? graphId;
const runId = `${graphId}-${Date.now()}`;
waitUntil(writeEvaluation(env, {
run_id: runId,
workflow_id: graphId,
component_id: componentId,
verdict: 'failed',
duration_ms,
error_message: errMsg.slice(0, 200),
evaluated_at: Date.now(),
}));
waitUntil(updateComponentStats(env, componentId, 'failed', duration_ms));
if (err instanceof ExecutionError) {
const traceFormatted = err.trace.map(s => ({
node: s.nodeId,
status: s.error ? 'failed' : 'success',
...(s.error ? { error: s.error } : {}),
}));
throw new Error(JSON.stringify({
success: false,
error: errMsg,
failed_node: err.failed_node,
failed_input: err.failed_input,
trace: traceFormatted,
duration_ms,
}));
}
throw err;
}
}
@@ -0,0 +1,36 @@
/**
* Execution Analytics — 零件執行後的統計記錄
*
* Phase 1 MVPstub(不寫入任何外部服務)
* Phase 7 補充:fire-and-forget POST 至 registry.arcrun.dev/analytics/record
*/
import type { Bindings } from '../types';
export interface EvaluationRecord {
run_id: string;
workflow_id: string;
component_id: string;
verdict: 'success' | 'failed' | 'timeout';
duration_ms: number;
error_message?: string;
evaluated_at: number;
}
/** 記錄執行結果(MVPno-opPhase 7 補充 analytics*/
export async function writeEvaluation(
_env: Bindings,
_record: EvaluationRecord,
): Promise<void> {
// Phase 7: POST to registry.arcrun.dev/analytics/record
}
/** 更新零件統計(MVPno-opPhase 7 補充)*/
export async function updateComponentStats(
_env: Bindings,
_componentId: string,
_verdict: 'success' | 'failed' | 'timeout',
_durationMs: number,
): Promise<void> {
// Phase 7: update ANALYTICS_KV via registry worker
}
@@ -0,0 +1,30 @@
import type { ParsedTriplets } from './triplet-parser';
import { toEdgeType } from './triplet-parser';
import type { SearchResult } from './search-nodes';
/** 從 nodeResults + parsed 組成可直接送入 /execute 的 ExecutionGraph */
export function buildExecutionGraph(
parsed: ParsedTriplets,
nodeResults: SearchResult['nodeResults'],
graphId: string,
graphName: string,
) {
const nodes = [...parsed.nodeNames].map(name => {
const nr = nodeResults[name]!;
const id = name.toLowerCase().replace(/\s+/g, '-');
return {
id,
type: nr.type,
componentId: nr.componentId,
label: name,
};
});
const edges = parsed.edges.map(e => ({
from: e.from.toLowerCase().replace(/\s+/g, '-'),
to: e.to.toLowerCase().replace(/\s+/g, '-'),
type: toEdgeType(e.label),
}));
return { id: graphId, name: graphName, nodes, edges };
}
@@ -0,0 +1,61 @@
import { BUILTIN_IDS } from '../lib/constants';
import type { ParsedTriplets, NodeRole } from './triplet-parser';
import { resolveNodeRole } from './triplet-parser';
import type { Bindings } from '../types';
export type SearchResult = {
nodeResults: Record<string, { status: 'found' | 'missing'; componentId?: string; type: NodeRole }>;
missingNodes: string[];
};
/**
* 對所有節點進行解析,確認每個節點對應的零件是否存在。
*
* 優先序:
* 1. Input/Output 角色:自動標記,不需查找
* 2. 內建零件(BUILTIN_IDS):直接標記 found
* 3. WASM_BUCKET 查找:確認 {componentId}/{componentId}.wasm 是否存在
*/
export async function searchNodes(
parsed: ParsedTriplets,
wasmBucket: R2Bucket,
): Promise<SearchResult> {
const nodeResults: Record<string, { status: 'found' | 'missing'; componentId?: string; type: NodeRole }> = {};
const missingNodes: string[] = [];
for (const nodeName of parsed.nodeNames) {
const role = resolveNodeRole(nodeName, parsed);
// 事件源節點(起始點):自動標記 Input,不查 WASM_BUCKET
if (role === 'Input') {
nodeResults[nodeName] = { status: 'found', componentId: nodeName.toLowerCase(), type: role };
continue;
}
// 輸出節點
if (role === 'Output') {
nodeResults[nodeName] = { status: 'found', componentId: nodeName.toLowerCase(), type: role };
continue;
}
// 內建零件:直接標記 found
if (BUILTIN_IDS.has(nodeName)) {
nodeResults[nodeName] = { status: 'found', componentId: nodeName, type: role };
continue;
}
// WASM_BUCKET 查找:確認 {nodeName}/{nodeName}.wasm 是否存在
// 節點名稱即零件 canonical_id(如 "gmail"、"telegram"
const wasmKey = `${nodeName}/${nodeName}.wasm`;
const obj = await wasmBucket.head(wasmKey);
if (obj) {
nodeResults[nodeName] = { status: 'found', componentId: nodeName, type: role };
} else {
nodeResults[nodeName] = { status: 'missing', type: role };
missingNodes.push(nodeName);
}
}
return { nodeResults, missingNodes };
}
@@ -0,0 +1,117 @@
import { SEMANTIC_EDGE_MAP, VALID_EDGE_TYPES } from '../lib/constants';
import type { EdgeType } from '../types';
export type ParsedTriplets = {
edges: Array<{ from: string; to: string; label: string }>;
nodeNames: Set<string>;
/** 出現在 from 但不出現在任何 to 的節點(事件源 / 起始點) */
sourceNodes: Set<string>;
/** 出現在 to 但不出現在任何 from 的節點(終點)*/
sinkNodes: Set<string>;
};
export type NodeRole = 'Input' | 'Component' | 'Output';
/**
* 解析後的零件 URI
* 支援格式:
* component://validate_json
* component://validate_json@stable
* component://validate_json@pinned:v1
* workflow://wf_save_to_db
* ui://u6u-btn
* style://glow-effect
*/
export interface ResolvedComponentId {
type: 'component' | 'workflow' | 'ui' | 'style';
canonicalId: string;
stability: 'floating' | 'stable' | 'pinned';
pinnedVersion?: string;
/** 原始 URI 字串 */
raw: string;
}
/** 解析零件 URI 協議 */
export function resolveComponentId(uri: string): ResolvedComponentId {
const raw = uri.trim();
// 解析協議前綴
let type: ResolvedComponentId['type'] = 'component';
let rest = raw;
if (raw.startsWith('component://')) {
type = 'component';
rest = raw.slice('component://'.length);
} else if (raw.startsWith('workflow://')) {
type = 'workflow';
rest = raw.slice('workflow://'.length);
} else if (raw.startsWith('ui://')) {
type = 'ui';
rest = raw.slice('ui://'.length);
} else if (raw.startsWith('style://')) {
type = 'style';
rest = raw.slice('style://'.length);
}
// 解析穩定性標籤
// component://id@stable
// component://id@pinned:v1
let canonicalId = rest;
let stability: ResolvedComponentId['stability'] = 'floating';
let pinnedVersion: string | undefined;
const atIdx = rest.indexOf('@');
if (atIdx > 0) {
canonicalId = rest.slice(0, atIdx);
const tag = rest.slice(atIdx + 1);
if (tag === 'stable') {
stability = 'stable';
} else if (tag.startsWith('pinned:')) {
stability = 'pinned';
pinnedVersion = tag.slice('pinned:'.length);
}
}
return { type, canonicalId, stability, pinnedVersion, raw };
}
/** 解析 triplets 字串陣列,回傳節點與邊的結構 */
export function parseTriplets(rawTriplets: unknown[]): ParsedTriplets | null {
const edges: Array<{ from: string; to: string; label: string }> = [];
const nodeNames = new Set<string>();
const fromSet = new Set<string>();
const toSet = new Set<string>();
for (const line of rawTriplets) {
if (typeof line !== 'string') continue;
const parts = line.split('>>').map((s: string) => s.trim());
if (parts.length !== 3) continue;
const [from, action, to] = parts;
edges.push({ from, to, label: action });
nodeNames.add(from);
nodeNames.add(to);
fromSet.add(from);
toSet.add(to);
}
if (nodeNames.size === 0) return null;
const sourceNodes = new Set([...fromSet].filter(n => !toSet.has(n)));
const sinkNodes = new Set([...toSet].filter(n => !fromSet.has(n)));
return { edges, nodeNames, sourceNodes, sinkNodes };
}
/** 根據節點在圖中的位置決定其 type */
export function resolveNodeRole(name: string, parsed: ParsedTriplets): NodeRole {
if (parsed.sourceNodes.has(name)) return 'Input';
if (parsed.sinkNodes.has(name)) return 'Output';
return 'Component';
}
/** 將 edge label 轉換為合法 EdgeType
* 優先序:VALID_EDGE_TYPES(完整匹配)→ SEMANTIC_EDGE_MAP(語意別名)→ 預設 PIPE */
export function toEdgeType(label: string): EdgeType {
const upper = label.toUpperCase();
if (VALID_EDGE_TYPES.has(upper)) return upper as EdgeType;
return (SEMANTIC_EDGE_MAP[label] ?? SEMANTIC_EDGE_MAP[upper] ?? 'PIPE') as EdgeType;
}
@@ -0,0 +1,63 @@
import type { Bindings } from '../types';
import { graphSchema } from '../lib/schemas';
import { parseTriplets } from './triplet-parser';
import { searchNodes } from './search-nodes';
import { buildExecutionGraph } from './graph-builder';
export async function resolveWebhookGraph(
body: Record<string, unknown>,
description: string,
env: Bindings,
): Promise<{ resolvedGraph: Record<string, unknown>; error?: string; missingNodes?: string[] }> {
// 路徑 Atriplets 格式
if (Array.isArray(body.triplets) && body.triplets.length > 0) {
const parsed = parseTriplets(body.triplets as unknown[]);
if (!parsed) return { resolvedGraph: {}, error: '無法解析 triplets' };
const { nodeResults, missingNodes } = await searchNodes(parsed, env.WASM_BUCKET);
if (missingNodes.length > 0) {
return { resolvedGraph: {}, error: `以下零件不存在:${missingNodes.join(', ')}。請執行 acr validate 確認所有零件已上傳。`, missingNodes };
}
const graphId = `webhook-${Date.now()}`;
const graphName = description || `Webhook ${new Date().toISOString()}`;
const graph = buildExecutionGraph(parsed, nodeResults, graphId, graphName) as Record<string, unknown>;
const parseResult = graphSchema.safeParse(graph);
if (!parseResult.success) {
return { resolvedGraph: {}, error: '圖定義產生失敗' };
}
return { resolvedGraph: graph };
}
// 路徑 Bgraph 格式
if (body.graph && typeof body.graph === 'object') {
const graphWithDefaults = {
id: `webhook-${Date.now()}`,
name: description || `Webhook ${new Date().toISOString()}`,
...(body.graph as Record<string, unknown>),
};
const parsed = graphSchema.safeParse(graphWithDefaults);
if (!parsed.success) {
return { resolvedGraph: {}, error: '圖定義驗證失敗' };
}
return { resolvedGraph: graphWithDefaults };
}
// 路徑 Cbody 直接就是 graph
if (body.nodes && body.edges) {
const graphWithDefaults = {
id: `webhook-${Date.now()}`,
name: description || `Webhook ${new Date().toISOString()}`,
...body,
};
const parsed = graphSchema.safeParse(graphWithDefaults);
if (!parsed.success) {
return { resolvedGraph: {}, error: '圖定義驗證失敗' };
}
return { resolvedGraph: graphWithDefaults };
}
return { resolvedGraph: {}, error: '需提供 graph 物件或 triplets 陣列' };
}
@@ -0,0 +1,67 @@
import type { Bindings, ExecutionGraph } from '../types';
import { ExecutionError } from '../types';
import { GraphExecutor } from '../graph-executor';
import { graphSchema } from '../lib/schemas';
import { createComponentLoader } from '../lib/component-loader';
type WebhookRecord = {
graph: Record<string, unknown>;
description: string;
created_at: string;
};
export function generateToken(): string {
const tokenBytes = crypto.getRandomValues(new Uint8Array(16));
return Array.from(tokenBytes).map(b => b.toString(16).padStart(2, '0')).join('');
}
export async function validateAndParseWebhook(raw: string): Promise<WebhookRecord | null> {
try {
return JSON.parse(raw) as WebhookRecord;
} catch {
return null;
}
}
export async function executeWebhookGraph(
env: Bindings,
graph: Record<string, unknown>,
triggerContext: Record<string, unknown>,
token: string,
): Promise<{ success: boolean; data?: unknown; error?: string; trace?: unknown; duration_ms: number }> {
const parsed = graphSchema.safeParse(graph);
if (!parsed.success) {
return { success: false, error: '圖定義已失效', duration_ms: 0 };
}
const loader = createComponentLoader(env);
const executor = new GraphExecutor(loader, undefined, env);
const start = Date.now();
try {
const result = await executor.execute(
parsed.data as ExecutionGraph,
{ ...triggerContext, _webhook_token: token },
env.EXEC_CONTEXT,
);
const duration_ms = Date.now() - start;
return { success: true, data: result.data, duration_ms };
} catch (err) {
const duration_ms = Date.now() - start;
const errMsg = err instanceof Error ? err.message : String(err);
if (err instanceof ExecutionError) {
const traceFormatted = err.trace.map(s => ({
node: s.nodeId,
status: s.error ? 'failed' : 'success',
...(s.error ? { error: s.error } : {}),
}));
return {
success: false,
error: errMsg,
trace: traceFormatted,
duration_ms,
};
}
return { success: false, error: errMsg, duration_ms };
}
}
+337
View File
@@ -0,0 +1,337 @@
// arcrun 圖遍歷引擎 — 支援完整 Cypher 語意關係
import type { ExecutionGraph, GraphNode, TraceStep, ComponentRunner, KVContextStore, EdgeType, Bindings } from './types';
import { kvSetNodeOutput, kvGetNodeOutput, ExecutionError } from './types';
import { injectCredentials } from './actions/credential-injector';
export type ComponentLoader = (componentId: string) => Promise<ComponentRunner>;
export type WorkflowLoader = (workflowId: string) => Promise<ExecutionGraph>;
// Fan-in 狀態:入度 > 1 的節點需要等所有上游完成後才執行
type FanInState = Map<string, { ctx: Record<string, unknown>; remaining: number }>;
export class GraphExecutor {
private loader: ComponentLoader;
private workflowLoader?: WorkflowLoader;
private env?: Bindings;
public recordComponentReference?: (componentId: string, workflowId: string) => Promise<void>;
constructor(loader: ComponentLoader, workflowLoader?: WorkflowLoader, env?: Bindings) {
this.loader = loader;
this.workflowLoader = workflowLoader;
this.env = env;
}
async execute(
graph: ExecutionGraph,
initialContext: Record<string, unknown>,
kvNamespace?: KVNamespace | undefined,
): Promise<{
data: unknown;
trace: TraceStep[];
}> {
const trace: TraceStep[] = [];
// 建立 KV Context StoreBUILD-006
// run_id = graphId + timestamp,確保每次執行獨立
const kvStore: KVContextStore | undefined = kvNamespace
? { runId: `${graph.id}-${Date.now()}`, kv: kvNamespace }
: undefined;
// 找出所有起點(沒有任何邊指向的節點)
const hasIncoming = new Set(graph.edges.map(e => e.to));
const startNodes = graph.nodes.filter(n => !hasIncoming.has(n.id));
if (startNodes.length === 0) {
return { data: initialContext, trace };
}
// 建立 fan-in 狀態(入度 > 1 的節點需要等所有上游)
const fanIn: FanInState = new Map();
for (const node of graph.nodes) {
const inDeg = graph.edges.filter(e => e.to === node.id).length;
if (inDeg > 1) {
fanIn.set(node.id, { ctx: { ...initialContext }, remaining: inDeg });
}
}
// 並行執行所有起點
const results = await Promise.all(
startNodes.map(node =>
this.executeNode(node, graph, initialContext, new Set(), trace, fanIn, kvStore)
)
);
// 合併所有起點的輸出
// 注意:若結果是 string(如 HTML),不可直接展開 — 展開 string 會產生字元索引物件
let mergedResult: unknown;
if (results.length === 1) {
mergedResult = results[0];
} else {
mergedResult = results.reduce(
(acc: Record<string, unknown>, r: unknown) => ({
...acc,
...(typeof r === 'object' && r !== null ? (r as Record<string, unknown>) : {}),
}),
{} as Record<string, unknown>
);
}
return { data: mergedResult, trace };
}
private async executeNode(
node: GraphNode,
graph: ExecutionGraph,
context: unknown,
visited: Set<string>,
trace: TraceStep[],
fanIn: FanInState,
kvStore?: KVContextStore,
): Promise<unknown> {
const nodeKey = `${node.id}:${JSON.stringify(context).slice(0, 50)}`;
if (visited.has(nodeKey)) return context;
visited.add(nodeKey);
const start = Date.now();
let result: unknown = context;
let nodeInput: unknown = context;
try {
switch (node.type) {
case 'Input':
result = node.data ?? context;
nodeInput = result;
break;
case 'Component': {
if (!node.componentId) throw new Error(`節點 ${node.id} 缺少 componentId`);
const runner = await this.loader(node.componentId);
// 優先順序:node.data(靜態參數,如 pattern/sheet> context(全局參數)
let mergedContext: Record<string, unknown> = {
...(context as Record<string, unknown>),
...(node.data ?? {}),
};
// Credential 注入:在 WASM 執行前自動注入 credentials_required 中宣告的 token
if (this.env) {
mergedContext = await injectCredentials(node.componentId, mergedContext, this.env);
}
nodeInput = mergedContext;
result = await runner(mergedContext);
// BUILD-006:將節點 output 存入 KVkey = {run_id}:node:{node_id}
// 這讓下游節點可以透過 KV 讀取上游的具名 output,解決同名欄位衝突
if (kvStore && result !== null && result !== undefined) {
await kvSetNodeOutput(kvStore, node.id, result);
}
// Phase 2:記錄 component 被引用(追蹤生命週期)
// 由 component-registry 追蹤使用狀態,決定是否保留
// 在後台執行,不阻擋主流程
void this.recordComponentReference?.(node.componentId, graph.id).catch(() => {
// 記錄失敗不應該中止執行
});
break;
}
case 'Output':
result = context;
break;
}
} catch (e: any) {
const errMsg = e.message || String(e);
trace.push({
nodeId: node.id,
type: node.type,
input: nodeInput,
output: null,
error: errMsg,
duration_ms: Date.now() - start,
});
// 若已是 ExecutionError(上游節點拋出),保留原始 trace 繼續往上傳
if (e instanceof ExecutionError) throw e;
throw new ExecutionError(
`Node ${node.id} failed: ${errMsg}`,
node.id,
nodeInput,
trace,
);
}
trace.push({
nodeId: node.id,
type: node.type,
input: nodeInput,
output: result,
duration_ms: Date.now() - start,
});
// 處理出邊
const outEdges = graph.edges.filter(e => e.from === node.id);
for (const edge of outEdges) {
const nextNode = graph.nodes.find(n => n.id === edge.to);
if (!nextNode) continue;
switch (edge.type as EdgeType) {
case 'PIPE': {
const baseResult = (typeof result === 'object' && result !== null)
? (result as Record<string, unknown>)
: {};
const pipeContext: Record<string, unknown> = {
...(context as Record<string, unknown>),
...baseResult,
};
if (kvStore) {
const kvOutput = await kvGetNodeOutput(kvStore, node.id);
if (kvOutput !== undefined) {
if (!pipeContext._kv_outputs) pipeContext._kv_outputs = {};
(pipeContext._kv_outputs as Record<string, unknown>)[node.id] = kvOutput;
}
}
const fanInState = fanIn.get(nextNode.id);
if (fanInState) {
Object.assign(fanInState.ctx, pipeContext);
fanInState.remaining--;
if (fanInState.remaining === 0) {
result = await this.executeNode(nextNode, graph, fanInState.ctx, visited, trace, fanIn, kvStore);
}
} else {
result = await this.executeNode(nextNode, graph, pipeContext, visited, trace, fanIn, kvStore);
}
break;
}
case 'ON_SUCCESS': {
// 只在上游節點成功(無 error)時執行
const hasError = result && typeof result === 'object' && 'error' in (result as object);
if (!hasError) {
result = await this.executeNode(nextNode, graph, result, visited, trace, fanIn, kvStore);
}
break;
}
case 'ON_FAIL': {
// 只在上游節點失敗(有 error)時執行,傳遞 error context
const hasError = result && typeof result === 'object' && 'error' in (result as object);
if (hasError) {
result = await this.executeNode(nextNode, graph, result, visited, trace, fanIn, kvStore);
}
break;
}
case 'IF': {
const passes = evaluateCondition(edge.condition ?? 'true', result);
if (passes) {
result = await this.executeNode(nextNode, graph, result, visited, trace, fanIn, kvStore);
}
break;
}
case 'FOREACH': {
const iteratorKey = edge.iterator ?? 'item';
const items = getIterableFromContext(result, iteratorKey);
const iterResults: unknown[] = [];
for (const item of items) {
const itemContext = { ...(result as Record<string, unknown>), [iteratorKey]: item };
const itemResult = await this.executeNode(nextNode, graph, itemContext, new Set(), trace, fanIn, kvStore);
iterResults.push(itemResult);
}
result = { ...(result as Record<string, unknown>), results: iterResults };
break;
}
case 'CALLS_SUBFLOW': {
// 從 workflowLoader 載入子 Workflow,以當前 context 執行,輸出合併回主流程
const subWorkflowId = nextNode.componentId?.replace('workflow://', '') ?? nextNode.id;
if (this.workflowLoader) {
const subGraph = await this.workflowLoader(subWorkflowId);
const subExecutor = new GraphExecutor(this.loader, this.workflowLoader);
const subResult = await subExecutor.execute(
subGraph,
result as Record<string, unknown>,
kvStore?.kv,
);
result = {
...(result as Record<string, unknown>),
...(subResult.data as Record<string, unknown>),
};
}
break;
}
case 'ON_CLICK': {
// 前端觸發:payload 已在 context 中,直接執行下游節點
result = await this.executeNode(nextNode, graph, result, visited, trace, fanIn, kvStore);
break;
}
case 'IS_A': {
// 節點類型宣告:記錄 componentId,不執行
// IS_A 邊的 to 是零件 URI(如 component://validate_json
// 這個資訊已在 graph-builder 階段處理,執行時不需要額外動作
break;
}
case 'CONTAINS':
case 'HAS_STYLE':
case 'HAS_BEHAVIOR': {
// 結構語意:只記錄圖結構,不執行
break;
}
case 'CONTINUE':
break;
}
}
return result;
}
}
/**
* 安全條件評估(不使用 new Function
* 支援格式:ctx.key === value, ctx.key > value, ctx.keytruthy
*/
function evaluateCondition(condition: string, context: unknown): boolean {
if (!context || typeof context !== 'object') return false;
const ctx = context as Record<string, unknown>;
// 正規化:把 result. 替換為空(直接存取 key)
const expr = condition.replace(/result\./g, '').replace(/ctx\./g, '');
// 簡單 === 比較
const eqMatch = expr.match(/^(\w+)\s*===?\s*(.+)$/);
if (eqMatch) {
const key = eqMatch[1];
const rawVal = eqMatch[2].trim();
const expected = rawVal === 'true' ? true : rawVal === 'false' ? false : rawVal.replace(/['"]/g, '');
return ctx[key] === expected;
}
// 簡單 > 比較
const gtMatch = expr.match(/^(\w+)\s*>\s*(\d+)$/);
if (gtMatch) {
return Number(ctx[gtMatch[1]]) > Number(gtMatch[2]);
}
// truthy check
const key = expr.trim();
if (key && key in ctx) return !!ctx[key];
return true;
}
function getIterableFromContext(context: unknown, key: string): unknown[] {
if (!context || typeof context !== 'object') return [];
const plural = key + 's';
const obj = context as Record<string, unknown>;
const items = obj[plural] ?? obj[key];
return Array.isArray(items) ? items : [];
}
+30
View File
@@ -0,0 +1,30 @@
// arcrun cypher-executor Worker — AI 工作流執行引擎
import { Hono } from 'hono';
import { cors } from 'hono/cors';
import type { Bindings } from './types';
import { healthRouter } from './routes/health';
import { executeRouter } from './routes/execute';
import { cypherRouter } from './routes/cypher';
import { validateRouter } from './routes/validate';
import { docsRouter } from './routes/docs';
import { webhooksRouter } from './routes/webhooks';
import { webhooksCrudRouter } from './routes/webhooks-crud';
import { webhooksListRouter } from './routes/webhooks-list';
const app = new Hono<{ Bindings: Bindings }>();
// 全域 CORS
app.use('*', cors());
// 掛載所有路由器
app.route('/', docsRouter);
app.route('/', healthRouter);
app.route('/', executeRouter);
app.route('/', cypherRouter);
app.route('/', validateRouter);
app.route('/', webhooksRouter);
app.route('/', webhooksCrudRouter);
app.route('/', webhooksListRouter);
// Worker 導出
export default app;
@@ -0,0 +1,49 @@
import { BUILTIN_COMPONENTS } from './constants';
import type { Bindings, ComponentRunner } from '../types';
/**
* 建立零件載入器
*
* 三層優先序:
* 1. 內建零件(BUILTIN_COMPONENTS,純本地轉換,不需 R2
* 2. WASM_BUCKET R2 直讀 → {componentId}/{componentId}.wasm
* 3. 找不到 → 結構化錯誤(含 R2 key 與修復說明)
*/
export function createComponentLoader(env: Bindings) {
return async (componentId: string): Promise<ComponentRunner> => {
// 層 1:內建零件(無需 R2
const builtin = BUILTIN_COMPONENTS.get(componentId);
if (builtin) return builtin;
// 層 2:從 WASM_BUCKET R2 讀取
const wasmKey = `${componentId}/${componentId}.wasm`;
const wasmObj = await env.WASM_BUCKET.get(wasmKey);
if (wasmObj) {
const wasmBuffer = await wasmObj.arrayBuffer();
return createWasmRunner(componentId, wasmBuffer, env);
}
// 層 3:找不到
throw new Error(
`零件 ${componentId} 不存在。\n` +
`請確認 ${wasmKey} 已上傳至 WASM_BUCKET。\n` +
`修復:執行 acr parts 查看可用零件清單。`
);
};
}
/**
* 建立 WASM 零件執行器
* 使用 WASI preview1 stdin/stdout JSON I/O 模型
*/
function createWasmRunner(
componentId: string,
wasmBuffer: ArrayBuffer,
_env: Bindings,
): ComponentRunner {
return async (ctx: unknown): Promise<unknown> => {
// 動態 import wasm-executor(避免頂層 import 造成 Worker 啟動問題)
const { executeWasm } = await import('./wasm-executor');
return executeWasm(componentId, wasmBuffer, ctx);
};
}
+54
View File
@@ -0,0 +1,54 @@
import type { ComponentRunner, EdgeType } from '../types';
export const VALID_EDGE_TYPES = new Set([
// 現有
'PIPE', 'IF', 'FOREACH', 'CONTINUE',
// 新增:執行語意
'IS_A', 'ON_SUCCESS', 'ON_FAIL',
// 新增:觸發語意
'ON_CLICK', 'CALLS_SUBFLOW',
// 新增:結構語意(記錄圖結構,不執行)
'CONTAINS', 'HAS_STYLE', 'HAS_BEHAVIOR',
]);
/** 內建零件 ID 集合(不需要查 WASM_BUCKETWorker 記憶體中已有實作)*/
export const BUILTIN_IDS = new Set([
'webhook', 'comp_passthrough', 'comp_uppercase', 'comp_counter',
]);
/** 語意邊 → EdgeType 映射(ADR-057 u6u L1:支援中文語意關係詞)
* 完成後 → PIPE(成功後觸發下一個)
* 失敗時 → CONTINUE(失敗後繼續)
* 對每個 → FOREACH(迭代執行)
* 條件滿足時 → IF(條件分支)
*/
export const SEMANTIC_EDGE_MAP: Record<string, EdgeType> = {
// 中文語意詞
'完成後': 'PIPE',
'失敗時': 'ON_FAIL',
'對每個': 'FOREACH',
'條件滿足時': 'IF',
// 英文別名
'SUCCESS': 'ON_SUCCESS',
'FAIL': 'ON_FAIL',
'CLICK': 'ON_CLICK',
'SUBFLOW': 'CALLS_SUBFLOW',
};
/**
* 內建零件表(靜態函數,不需要 R2)
* WASM 零件從 WASM_BUCKET R2 直接讀取
*/
export const BUILTIN_COMPONENTS = new Map<string, ComponentRunner>([
['comp_passthrough', (ctx) => ctx],
['comp_uppercase', (ctx) => {
const c = ctx as Record<string, unknown>;
return { ...c, text: String(c.text || '').toUpperCase() };
}],
['comp_counter', (ctx) => {
const c = ctx as Record<string, unknown>;
return { ...c, count: (Number(c.count) || 0) + 1 };
}],
]);
export const SCORE_THRESHOLD = 0.5;
+306
View File
@@ -0,0 +1,306 @@
export const OPENAPI_SPEC = {
openapi: '3.0.3',
info: {
title: 'arcrun cypher-executor API',
description: 'AI Workflow Execution Engine — 透過三元組 Triplet 或圖 Graph 定義工作流,系統執行並回傳結果',
version: '1.0.0',
contact: {
name: 'arcrun',
url: 'https://github.com/arcrun/arcrun',
},
},
servers: [
{ url: 'https://cypher.arcrun.dev', description: 'arcrun.dev Hosted' },
{ url: 'http://localhost:8787', description: 'Local Development' },
],
paths: {
'/': {
get: {
summary: 'Health Check',
tags: ['Health'],
responses: {
'200': {
description: 'Service is running',
content: {
'application/json': {
schema: {
type: 'object',
properties: {
service: { type: 'string' },
version: { type: 'string' },
status: { type: 'string' },
},
},
},
},
},
},
},
},
'/cypher/search': {
post: {
summary: '搜尋工作流需要的零件',
tags: ['Cypher'],
description: '用三元組描述工作流,系統解析並從 Registry 查詢對應零件',
requestBody: {
required: true,
content: {
'application/json': {
schema: {
type: 'object',
properties: {
triplets: {
type: 'array',
items: { type: 'string' },
example: ['start >> 完成後 >> get-data', 'get-data >> 完成後 >> done'],
description: '三元組陣列,格式:\"FROM >> ACTION >> TO\"',
},
auto_publish: {
type: 'boolean',
default: true,
description: '缺失的零件是否自動產生發佈',
},
},
required: ['triplets'],
},
},
},
},
responses: {
'200': {
description: '零件搜尋成功(含版本號和時戳,適合 Markdown 文檔追蹤)',
content: {
'application/json': {
schema: {
type: 'object',
properties: {
version: { type: 'string', example: 'search-v1-20260327-143022', description: '版本號(endpoint-v{major}-{timestamp}' },
timestamp: { type: 'string', format: 'date-time', description: 'ISO 8601 時戳' },
triplets: { type: 'array', items: { type: 'string' }, description: '回送的三元組列表' },
nodes: { type: 'object', description: '搜尋到的零件及其狀態' },
cypher: { type: 'object', description: '工作流圖(null 若有缺失零件)' },
missing: { type: 'array', items: { type: 'string' }, description: '缺失零件列表' },
auto_published: { type: 'object', description: '自動發佈的零件(若 auto_publish=true' },
},
},
},
},
},
'400': { description: '無法解析三元組' },
},
},
},
'/cypher/execute': {
post: {
summary: '執行工作流',
tags: ['Cypher'],
description: '直接執行 triplets,回傳完整執行結果。支援自動發佈缺失零件。',
requestBody: {
required: true,
content: {
'application/json': {
schema: {
type: 'object',
properties: {
triplets: {
type: 'array',
items: { type: 'string' },
description: '三元組陣列,格式:"FROM >> ACTION >> TO"',
},
context: {
type: 'object',
description: '執行上下文,傳入各節點作為初始參數',
},
auto_publish: {
type: 'boolean',
default: true,
description: '缺失的零件是否自動產生臨時實作',
},
},
required: ['triplets'],
},
},
},
},
responses: {
'200': {
description: '執行成功(含版本號和時戳)',
content: {
'application/json': {
schema: {
type: 'object',
properties: {
version: { type: 'string', example: 'execute-v1-20260327-143022', description: '版本號(endpoint-v{major}-{timestamp}' },
timestamp: { type: 'string', format: 'date-time', description: 'ISO 8601 時戳' },
success: { type: 'boolean', enum: [true] },
data: { type: 'object', description: '執行結果' },
trace: { type: 'array', description: '執行跟蹤' },
duration_ms: { type: 'number' },
},
},
},
},
},
'500': {
description: '執行失敗或部份零件缺失(含版本號和時戳)',
content: {
'application/json': {
schema: {
type: 'object',
properties: {
version: { type: 'string', example: 'execute-v1-20260327-143022', description: '版本號(endpoint-v{major}-{timestamp}' },
timestamp: { type: 'string', format: 'date-time', description: 'ISO 8601 時戳' },
success: { type: 'boolean', enum: [false] },
error: { type: 'string' },
missing: { type: 'array', items: { type: 'string' }, description: '無法自動發佈的缺失零件' },
auto_published: {
type: 'object',
description: '自動發佈的零件資訊',
additionalProperties: {
type: 'object',
properties: {
ok: { type: 'boolean' },
componentId: { type: 'string' },
temporary_endpoint: { type: 'string', format: 'uri', description: '臨時實作的 URL' },
implement_by: { type: 'string', format: 'date-time', description: '實作截止時間' },
},
},
},
duration_ms: { type: 'number' },
},
},
},
},
},
},
},
},
'/webhooks': {
post: {
summary: '建立 Webhook',
tags: ['Webhooks'],
description: '將工作流註冊成 Webhook,得到公開 URL',
requestBody: {
required: true,
content: {
'application/json': {
schema: {
type: 'object',
properties: {
triplets: {
type: 'array',
items: { type: 'string' },
},
description: { type: 'string' },
},
},
},
},
},
responses: {
'201': {
description: 'Webhook 建立成功',
content: {
'application/json': {
schema: {
type: 'object',
properties: {
token: { type: 'string' },
webhook_url: { type: 'string', format: 'uri' },
description: { type: 'string' },
created_at: { type: 'string', format: 'date-time' },
},
},
},
},
},
},
},
get: {
summary: '列出所有 Webhooks',
tags: ['Webhooks'],
parameters: [
{
name: 'Authorization',
in: 'header',
required: true,
schema: { type: 'string', example: 'Bearer u6u_xxxxx' },
description: 'API Key 認證',
},
],
responses: {
'200': {
description: 'Webhooks 列表',
content: {
'application/json': {
schema: {
type: 'object',
properties: {
webhooks: {
type: 'array',
items: {
type: 'object',
properties: {
token: { type: 'string' },
description: { type: 'string' },
created_at: { type: 'string', format: 'date-time' },
},
},
},
total: { type: 'number' },
},
},
},
},
},
'401': { description: '未授權' },
},
},
},
'/webhooks/{token}': {
get: {
summary: '查詢單個 Webhook',
tags: ['Webhooks'],
parameters: [
{
name: 'token',
in: 'path',
required: true,
schema: { type: 'string' },
},
],
responses: {
'200': {
description: 'Webhook 資訊',
},
'404': { description: 'Webhook 不存在' },
},
},
delete: {
summary: '刪除 Webhook',
tags: ['Webhooks'],
parameters: [
{
name: 'token',
in: 'path',
required: true,
schema: { type: 'string' },
},
],
responses: {
'200': { description: 'Webhook 已刪除' },
'404': { description: 'Webhook 不存在' },
},
},
},
},
components: {
securitySchemes: {
ApiKeyAuth: {
type: 'apiKey',
in: 'header',
name: 'Authorization',
},
},
},
};
+25
View File
@@ -0,0 +1,25 @@
import { z } from 'zod';
// 圖定義的 Zod Schema
export const graphSchema = z.object({
id: z.string().min(1),
name: z.string().min(1),
nodes: z.array(z.object({
id: z.string(),
type: z.enum(['Input', 'Component', 'Output']),
componentId: z.string().optional(),
data: z.record(z.unknown()).optional(),
})),
edges: z.array(z.object({
from: z.string(),
to: z.string(),
type: z.enum(['PIPE', 'IF', 'FOREACH', 'CONTINUE']),
condition: z.string().optional(),
iterator: z.string().optional(),
})),
});
export const executeSchema = z.object({
graph: graphSchema,
context: z.record(z.unknown()).default({}),
});
+243
View File
@@ -0,0 +1,243 @@
/**
* WASI preview1 輕量 shim
* 只實作 stdin/stdout/stderr 所需的最小 syscall 集合。
* 其餘 syscall 一律回傳 ENOSYS(76),確保零件無法呼叫網路或檔案系統。
*
* 不依賴任何外部套件(不使用 @cloudflare/workers-wasi)。
* Requirements: 3.1, 3.3
*/
const WASI_ESUCCESS = 0;
const WASI_ENOSYS = 76;
// fd 常數
const FD_STDIN = 0;
const FD_STDOUT = 1;
const FD_STDERR = 2;
export interface WasiShim {
/** WebAssembly.Imports 物件,傳入 WebAssembly.instantiate */
imports: WebAssembly.Imports;
/** 取得 stdout 的完整輸出(合併所有 chunks) */
getStdout(): string;
/** 取得 stderr 的完整輸出 */
getStderr(): string;
/** 注入 WebAssembly.Memoryinstantiate 後呼叫) */
setMemory(memory: WebAssembly.Memory): void;
}
/**
* Host function 注入介面
* 讓 .wasm 零件能透過 host function 呼叫外部服務,而不需要網路 syscall
*/
export interface WasiHostFunctions {
/** HTTP 請求 host function.wasm 呼叫此函數發出 HTTP 請求 */
http_request?: (url: string, method: string, headers: string, body: string) => Promise<string>;
}
/**
* 建立 WASI shim 實例
* @param stdinData - 要寫入 stdin 的 UTF-8 字串(通常是 JSON.stringify(input)
* @param hostFunctions - 可選的 host function 注入(讓 .wasm 呼叫外部服務)
*/
export function createWasiShim(stdinData: string, hostFunctions?: WasiHostFunctions): WasiShim {
const stdinBytes = new TextEncoder().encode(stdinData);
let stdinOffset = 0;
const stdoutChunks: Uint8Array[] = [];
const stderrChunks: Uint8Array[] = [];
let memory: WebAssembly.Memory | null = null;
function getMemoryView(): DataView {
if (!memory) throw new Error('WASI memory not set — call setMemory() after instantiate');
return new DataView(memory.buffer);
}
/**
* fd_write: 將 iovec 陣列的資料寫入 fdstdout=1 或 stderr=2
* iovec 結構:{ buf: i32, buf_len: i32 }(各 4 byteslittle-endian
*/
function fd_write(fd: number, iovs: number, iovs_len: number, nwritten_ptr: number): number {
if (fd !== FD_STDOUT && fd !== FD_STDERR) return WASI_ENOSYS;
const view = getMemoryView();
const buf = memory!.buffer;
let totalWritten = 0;
for (let i = 0; i < iovs_len; i++) {
const iov_base = view.getUint32(iovs + i * 8, true);
const iov_len = view.getUint32(iovs + i * 8 + 4, true);
if (iov_len === 0) continue;
const chunk = new Uint8Array(buf, iov_base, iov_len);
const copy = new Uint8Array(iov_len);
copy.set(chunk);
if (fd === FD_STDOUT) stdoutChunks.push(copy);
else stderrChunks.push(copy);
totalWritten += iov_len;
}
view.setUint32(nwritten_ptr, totalWritten, true);
return WASI_ESUCCESS;
}
/**
* fd_read: 從 stdin 讀取資料到 iovec 陣列
*/
function fd_read(fd: number, iovs: number, iovs_len: number, nread_ptr: number): number {
if (fd !== FD_STDIN) return WASI_ENOSYS;
const view = getMemoryView();
const buf = memory!.buffer;
let totalRead = 0;
for (let i = 0; i < iovs_len; i++) {
const iov_base = view.getUint32(iovs + i * 8, true);
const iov_len = view.getUint32(iovs + i * 8 + 4, true);
if (iov_len === 0) continue;
const remaining = stdinBytes.length - stdinOffset;
if (remaining <= 0) break;
const toCopy = Math.min(iov_len, remaining);
const dest = new Uint8Array(buf, iov_base, toCopy);
dest.set(stdinBytes.subarray(stdinOffset, stdinOffset + toCopy));
stdinOffset += toCopy;
totalRead += toCopy;
}
view.setUint32(nread_ptr, totalRead, true);
return WASI_ESUCCESS;
}
/**
* proc_exit: 零件呼叫 exit(),拋出 Error 中止執行
*/
function proc_exit(code: number): never {
throw new Error(`wasm exit: ${code}`);
}
/**
* random_get: 填充隨機 bytes(使用 Web Crypto API
*/
function random_get(buf_ptr: number, buf_len: number): number {
const view = new Uint8Array(memory!.buffer, buf_ptr, buf_len);
crypto.getRandomValues(view);
return WASI_ESUCCESS;
}
const shim: WasiShim = {
imports: {
wasi_snapshot_preview1: { fd_write,
fd_read,
proc_exit,
random_get,
// 其餘 syscall 回傳 ENOSYS(不允許網路/檔案系統操作)
fd_seek: () => WASI_ENOSYS,
fd_close: () => WASI_ESUCCESS,
fd_fdstat_get: () => WASI_ENOSYS,
fd_prestat_get: () => WASI_ENOSYS,
fd_prestat_dir_name: () => WASI_ENOSYS,
environ_get: () => WASI_ESUCCESS,
environ_sizes_get: (count_ptr: number, size_ptr: number) => {
if (memory) {
const view = getMemoryView();
view.setUint32(count_ptr, 0, true);
view.setUint32(size_ptr, 0, true);
}
return WASI_ESUCCESS;
},
args_get: () => WASI_ESUCCESS,
args_sizes_get: (argc_ptr: number, argv_buf_size_ptr: number) => {
if (memory) {
const view = getMemoryView();
view.setUint32(argc_ptr, 0, true);
view.setUint32(argv_buf_size_ptr, 0, true);
}
return WASI_ESUCCESS;
},
clock_time_get: (id: number, precision: bigint, time_ptr: number) => {
if (memory) {
const view = getMemoryView();
const now = BigInt(Date.now()) * 1_000_000n;
view.setBigUint64(time_ptr, now, true);
}
return WASI_ESUCCESS;
},
clock_res_get: () => WASI_ENOSYS,
poll_oneoff: () => WASI_ENOSYS,
sched_yield: () => WASI_ESUCCESS,
proc_raise: () => WASI_ENOSYS,
sock_accept: () => WASI_ENOSYS,
sock_recv: () => WASI_ENOSYS,
sock_send: () => WASI_ENOSYS,
sock_shutdown: () => WASI_ENOSYS,
path_open: () => WASI_ENOSYS,
path_create_directory: () => WASI_ENOSYS,
path_remove_directory: () => WASI_ENOSYS,
path_rename: () => WASI_ENOSYS,
path_unlink_file: () => WASI_ENOSYS,
path_filestat_get: () => WASI_ENOSYS,
path_readlink: () => WASI_ENOSYS,
path_symlink: () => WASI_ENOSYS,
path_link: () => WASI_ENOSYS,
},
// u6u host functions:讓 .wasm 零件透過 host function 呼叫外部服務
// .wasm 零件用 //go:wasmimport u6u http_request 宣告
u6u: {
http_request: hostFunctions?.http_request
? async (urlPtr: number, urlLen: number, methodPtr: number, methodLen: number,
headersPtr: number, headersLen: number, bodyPtr: number, bodyLen: number,
outPtr: number, outLenPtr: number): Promise<number> => {
if (!memory) return 1;
const buf = memory.buffer;
const dec = new TextDecoder();
const url = dec.decode(new Uint8Array(buf, urlPtr, urlLen));
const method = dec.decode(new Uint8Array(buf, methodPtr, methodLen));
const headers = dec.decode(new Uint8Array(buf, headersPtr, headersLen));
const body = dec.decode(new Uint8Array(buf, bodyPtr, bodyLen));
try {
const result = await hostFunctions!.http_request!(url, method, headers, body);
const encoded = new TextEncoder().encode(result);
// 寫入結果到 outPtr 指向的 buffer
const view = new DataView(buf);
new Uint8Array(buf, outPtr, encoded.length).set(encoded);
view.setUint32(outLenPtr, encoded.length, true);
return 0; // success
} catch {
return 1; // error
}
}
: () => 1, // host function 未注入時回傳錯誤
},
},
setMemory(mem: WebAssembly.Memory) {
memory = mem;
},
getStdout(): string {
if (stdoutChunks.length === 0) return '';
const total = stdoutChunks.reduce((n, c) => n + c.length, 0);
const merged = new Uint8Array(total);
let offset = 0;
for (const chunk of stdoutChunks) {
merged.set(chunk, offset);
offset += chunk.length;
}
return new TextDecoder().decode(merged);
},
getStderr(): string {
if (stderrChunks.length === 0) return '';
const total = stderrChunks.reduce((n, c) => n + c.length, 0);
const merged = new Uint8Array(total);
let offset = 0;
for (const chunk of stderrChunks) {
merged.set(chunk, offset);
offset += chunk.length;
}
return new TextDecoder().decode(merged);
},
};
return shim;
}
+119
View File
@@ -0,0 +1,119 @@
/**
* Tier 1 WASM 執行器
* 從 R2 載入 .wasm,透過 WASI preview1 shim 執行,stdin/stdout JSON I/O。
*
* 快取策略:WebAssembly.Module 快取於 Worker 記憶體(跨請求共享),
* 避免重複編譯。每次執行只重新 instantiate。
*
* Requirements: 3.1, 3.3, 6.6
*/
import { createWasiShim, type WasiHostFunctions } from './wasi-shim';
// Worker 記憶體快取:r2Key → WebAssembly.Module
const moduleCache = new Map<string, WebAssembly.Module>();
export interface WasmExecutorOptions {
/** R2 Bucket binding */
bucket: R2Bucket;
/** R2 物件鍵(例:components/validate_json/v1.wasm */
r2Key: string;
/** 逾時上限(ms),對應 contract.constraints.max_cold_start_ms */
timeoutMs?: number;
/** 可選的 host function 注入(讓 .wasm 呼叫外部服務) */
hostFunctions?: WasiHostFunctions;
}
export interface WasmExecuteResult {
output: unknown;
stdout: string;
stderr: string;
duration_ms: number;
}
/**
* 執行 WASM 零件
* @param input - 傳入零件的 JSON 物件(寫入 stdin
* @param options - 執行選項
*/
export async function executeWasm(
input: unknown,
options: WasmExecutorOptions,
): Promise<WasmExecuteResult> {
const { bucket, r2Key, timeoutMs = 50, hostFunctions } = options;
// ...(其餘不變)
const start = Date.now();
// 1. 取得或編譯 WebAssembly.Module(快取)
let wasmModule = moduleCache.get(r2Key);
if (!wasmModule) {
const obj = await bucket.get(r2Key);
if (!obj) throw new Error(`WASM 零件不存在於 R2${r2Key}`);
const arrayBuffer = await obj.arrayBuffer();
wasmModule = await WebAssembly.compile(arrayBuffer);
moduleCache.set(r2Key, wasmModule);
}
// 2. 建立 WASI shim,注入 stdin 與可選的 host functions
const stdinJson = JSON.stringify(input);
const shim = createWasiShim(stdinJson, hostFunctions);
// 3. instantiate(每次執行都重新 instantiate,shim 狀態是獨立的)
const instance = await WebAssembly.instantiate(wasmModule, shim.imports);
// 4. 注入 memoryWASI fd_read/fd_write 需要存取 memory
const memory = instance.exports.memory as WebAssembly.Memory | undefined;
if (memory) shim.setMemory(memory);
// 5. 執行(帶逾時)
const exports = instance.exports as Record<string, unknown>;
const entryFn = (exports._start ?? exports.main) as (() => void) | undefined;
if (typeof entryFn !== 'function') {
throw new Error(`WASM 零件缺少 _start 或 main exportr2Key: ${r2Key}`);
}
const runWithTimeout = new Promise<void>((resolve, reject) => {
const timer = setTimeout(() => {
reject(new Error(`WASM 執行逾時(>${timeoutMs}ms):${r2Key}`));
}, timeoutMs);
try {
entryFn();
clearTimeout(timer);
resolve();
} catch (e) {
clearTimeout(timer);
// proc_exit(0) 拋出 "wasm exit: 0",視為正常結束
if (e instanceof Error && e.message === 'wasm exit: 0') {
resolve();
} else {
reject(e);
}
}
});
await runWithTimeout;
// 6. 讀取 stdoutJSON.parse
const stdout = shim.getStdout().trim();
const stderr = shim.getStderr().trim();
const duration_ms = Date.now() - start;
if (!stdout) {
throw new Error(`WASM 零件沒有輸出(stdout 為空):${r2Key}`);
}
let output: unknown;
try {
output = JSON.parse(stdout);
} catch {
throw new Error(`WASM 零件輸出不是合法 JSON${stdout.slice(0, 200)}`);
}
return { output, stdout, stderr, duration_ms };
}
/** 清除 Module 快取(測試用) */
export function clearModuleCache(): void {
moduleCache.clear();
}
+84
View File
@@ -0,0 +1,84 @@
import { Hono } from 'hono';
import type { Bindings } from '../types';
import { handleCypherSearch, handleCypherExecute } from '../actions/cypher-handlers';
export const cypherRouter = new Hono<{ Bindings: Bindings }>();
// POST /cypher/search — 三元組 → 解析節點 → 語意搜尋零件 → 回傳 Cypher JSON (開發友善格式)
cypherRouter.post('/cypher/search', async (c) => {
const body = await c.req.json() as { triplets?: unknown };
const rawTriplets = body?.triplets;
if (!Array.isArray(rawTriplets) || rawTriplets.length === 0) {
return c.json({ error: 'triplets 必須為非空字串陣列' }, 400);
}
try {
const now = new Date();
const timestamp = now.toISOString();
const versionId = `search-v1-${now.getFullYear()}${String(now.getMonth() + 1).padStart(2, '0')}${String(now.getDate()).padStart(2, '0')}-${String(now.getHours()).padStart(2, '0')}${String(now.getMinutes()).padStart(2, '0')}${String(now.getSeconds()).padStart(2, '0')}`;
const result = await handleCypherSearch(rawTriplets, c.env);
const response = {
version: versionId,
timestamp,
triplets: rawTriplets,
nodes: result.nodes,
cypher: result.cypher,
missing: result.missing,
};
return c.json(response);
} catch (err) {
const errMsg = err instanceof Error ? err.message : String(err);
return c.json({ error: errMsg }, 400);
}
});
// POST /cypher/execute — 三元組 → 一步執行(search + execute 合一)
cypherRouter.post('/cypher/execute', async (c) => {
const body = await c.req.json() as { triplets?: unknown; context?: Record<string, unknown>; graph_id?: string; graph_name?: string };
if (!Array.isArray(body?.triplets) || body.triplets.length === 0) {
return c.json({ error: 'triplets 必須為非空字串陣列' }, 400);
}
const graphId = typeof body.graph_id === 'string' ? body.graph_id : `triplet-exec-${Date.now()}`;
const graphName = typeof body.graph_name === 'string' ? body.graph_name : 'Triplet Execution';
const now = new Date();
const timestamp = now.toISOString();
// 版本號格式:execute-v1-20260327-143022
const versionId = `execute-v1-${now.getFullYear()}${String(now.getMonth() + 1).padStart(2, '0')}${String(now.getDate()).padStart(2, '0')}-${String(now.getHours()).padStart(2, '0')}${String(now.getMinutes()).padStart(2, '0')}${String(now.getSeconds()).padStart(2, '0')}`;
try {
const result = await handleCypherExecute(
body.triplets as unknown[],
body.context,
graphId,
graphName,
c.env,
(p) => c.executionCtx.waitUntil(p),
);
// 包裝成開發友善格式(execute 成功時)
const response = {
version: versionId,
timestamp,
...result,
};
return c.json(response);
} catch (err) {
const errMsg = err instanceof Error ? err.message : String(err);
try {
const parsed = JSON.parse(errMsg);
const response = {
version: versionId,
timestamp,
...parsed,
};
return c.json(response, 500);
} catch {
return c.json({ version: versionId, timestamp, success: false, error: errMsg, duration_ms: 0 }, 500);
}
}
});
+49
View File
@@ -0,0 +1,49 @@
import { Hono } from 'hono';
import type { Bindings } from '../types';
import { OPENAPI_SPEC } from '../lib/openapi';
export const docsRouter = new Hono<{ Bindings: Bindings }>();
// GET /openapi.json
docsRouter.get('/openapi.json', (c) => {
return c.json(OPENAPI_SPEC);
});
// GET /docs — Swagger UI
docsRouter.get('/docs', (c) => {
const specStr = JSON.stringify(OPENAPI_SPEC);
const htmlStr = `<!doctype html>
<html>
<head>
<title>Cypher Executor API Docs</title>
<meta charset="utf-8"/>
<meta name="viewport" content="width=device-width, initial-scale=1">
<link rel="stylesheet" href="https://unpkg.com/swagger-ui-dist@4/swagger-ui.css">
<style>html { box-sizing: border-box; overflow: -moz-scrollbars-vertical; overflow-y: scroll; } *, *:before, *:after { box-sizing: inherit; } body { margin:0; padding:0; }</style>
</head>
<body>
<div id="swagger-ui"></div>
<script src="https://unpkg.com/swagger-ui-dist@4/swagger-ui-bundle.js"> </script>
<script src="https://unpkg.com/swagger-ui-dist@4/swagger-ui-standalone-preset.js"> </script>
<script>
window.onload = () => {
window.ui = SwaggerUIBundle({
spec: ${specStr},
dom_id: '#swagger-ui',
deepLinking: true,
presets: [
SwaggerUIBundle.presets.apis,
SwaggerUIStandalonePreset
],
plugins: [
SwaggerUIBundle.plugins.DownloadUrl
],
layout: "BaseLayout"
})
}
</script>
</body>
</html>
`;
return c.html(htmlStr);
});
+55
View File
@@ -0,0 +1,55 @@
import { Hono } from 'hono';
import type { Bindings, ExecutionGraph } from '../types';
import { ExecutionError } from '../types';
import { GraphExecutor } from '../graph-executor';
import { executeSchema } from '../lib/schemas';
import { createComponentLoader } from '../lib/component-loader';
import { writeExecutionVerdict } from '../actions/execution-logger';
export const executeRouter = new Hono<{ Bindings: Bindings }>();
// POST /execute — 執行一個完整的圖
executeRouter.post('/execute', async (c) => {
const body = await c.req.json();
const parsed = executeSchema.safeParse(body);
if (!parsed.success) {
return c.json({ error: '圖定義驗證失敗', details: parsed.error.issues }, 400);
}
const { graph, context } = parsed.data;
const loader = createComponentLoader(c.env);
const executor = new GraphExecutor(loader);
const start = Date.now();
try {
// BUILD-006:傳入 KV namespace(若不存在則 fallback 到記憶體 merge
const result = await executor.execute(graph as ExecutionGraph, context, c.env.EXEC_CONTEXT);
const duration_ms = Date.now() - start;
c.executionCtx.waitUntil(
writeExecutionVerdict(c.env, graph.id, graph.nodes, 'success', duration_ms, '執行完成')
);
return c.json({ success: true, data: result.data, trace: result.trace, duration_ms });
} catch (err) {
const duration_ms = Date.now() - start;
const errMsg = err instanceof Error ? err.message : String(err);
c.executionCtx.waitUntil(
writeExecutionVerdict(c.env, graph.id, graph.nodes, 'failed', duration_ms, errMsg.slice(0, 100))
);
if (err instanceof ExecutionError) {
const traceFormatted = err.trace.map(s => ({
node: s.nodeId,
status: s.error ? 'failed' : 'success',
...(s.error ? { error: s.error } : {}),
}));
return c.json({
success: false,
error: errMsg,
failed_node: err.failed_node,
failed_input: err.failed_input,
trace: traceFormatted,
duration_ms,
}, 500);
}
return c.json({ success: false, error: errMsg, failed_node: null, trace: [], duration_ms }, 500);
}
});
+16
View File
@@ -0,0 +1,16 @@
import { Hono } from 'hono';
import type { Bindings } from '../types';
export const healthRouter = new Hono<{ Bindings: Bindings }>();
healthRouter.get('/health', (c) =>
c.json({ ok: true })
);
healthRouter.get('/', (c) =>
c.json({
service: 'arcrun-cypher-executor',
version: '1.0.0',
status: 'ok',
})
);
+26
View File
@@ -0,0 +1,26 @@
import { Hono } from 'hono';
import type { Bindings } from '../types';
import { graphSchema } from '../lib/schemas';
export const validateRouter = new Hono<{ Bindings: Bindings }>();
// POST /validate — 驗證圖定義(不執行)
validateRouter.post('/validate', async (c) => {
const body = await c.req.json();
const parsed = graphSchema.safeParse(body);
if (!parsed.success) {
return c.json({ valid: false, errors: parsed.error.issues }, 400);
}
const nodeIds = new Set(parsed.data.nodes.map(n => n.id));
const invalidEdges = parsed.data.edges.filter(e => !nodeIds.has(e.from) || !nodeIds.has(e.to));
if (invalidEdges.length > 0) {
return c.json({
valid: false,
errors: invalidEdges.map(e => `${e.from}${e.to} 指向不存在的節點`),
}, 400);
}
return c.json({ valid: true, nodeCount: parsed.data.nodes.length, edgeCount: parsed.data.edges.length });
});
@@ -0,0 +1,83 @@
import { Hono } from 'hono';
import type { Bindings } from '../types';
import { validateAndParseWebhook } from '../actions/webhook-handlers';
export const webhooksCrudRouter = new Hono<{ Bindings: Bindings }>();
type WebhookRecord = {
graph: Record<string, unknown>;
description: string;
created_at: string;
};
// GET /webhooks/:token — 查詢 Webhook 基本資訊
webhooksCrudRouter.get('/webhooks/:token', async (c) => {
const token = c.req.param('token');
const raw = await c.env.WEBHOOKS.get(token, 'text');
if (!raw) return c.json({ error: 'not found' }, 404);
const record = await validateAndParseWebhook(raw);
if (!record) return c.json({ error: '資料損毀' }, 500);
return c.json({
token,
description: record.description,
created_at: record.created_at,
});
});
// PUT /webhooks/:token — 更新 Webhook 定義
webhooksCrudRouter.put('/webhooks/:token', async (c) => {
const token = c.req.param('token');
if (!token || token.length < 16) {
return c.json({ error: 'invalid token' }, 400);
}
const raw = await c.env.WEBHOOKS.get(token, 'text');
if (!raw) return c.json({ error: 'webhook not found' }, 404);
const existing = await validateAndParseWebhook(raw);
if (!existing) return c.json({ error: 'webhook 定義損毀' }, 500);
const body = await c.req.json().catch(() => null);
if (!body) return c.json({ error: 'invalid json' }, 400);
const updatedRecord: WebhookRecord = {
graph: existing.graph,
description: existing.description,
created_at: existing.created_at,
};
if (body.description !== undefined) {
updatedRecord.description = typeof body.description === 'string' ? body.description : existing.description;
}
if (body.graph !== undefined) {
updatedRecord.graph = body.graph;
}
await c.env.WEBHOOKS.put(token, JSON.stringify(updatedRecord));
const baseUrl = new URL(c.req.url).origin;
return c.json({
token,
webhook_url: `${baseUrl}/webhooks/${token}/trigger`,
description: updatedRecord.description,
created_at: updatedRecord.created_at,
updated: true,
});
});
// DELETE /webhooks/:token — 刪除 Webhook
webhooksCrudRouter.delete('/webhooks/:token', async (c) => {
const token = c.req.param('token');
if (!token || token.length < 16) {
return c.json({ error: 'invalid token' }, 400);
}
const existing = await c.env.WEBHOOKS.get(token, 'text');
if (!existing) return c.json({ error: 'webhook not found' }, 404);
await c.env.WEBHOOKS.delete(token);
return c.json({ deleted: true, token });
});
@@ -0,0 +1,32 @@
import { Hono } from 'hono';
import type { Bindings } from '../types';
import { validateAndParseWebhook } from '../actions/webhook-handlers';
export const webhooksListRouter = new Hono<{ Bindings: Bindings }>();
// GET /webhooks — 列出所有 Webhooks(需要授權標頭)
webhooksListRouter.get('/webhooks', async (c) => {
const authHeader = c.req.header('Authorization');
if (!authHeader) {
return c.json({ error: 'unauthorized: missing Authorization header' }, 401);
}
const list = await c.env.WEBHOOKS.list();
const webhooks = [];
for (const key of list.keys) {
const raw = await c.env.WEBHOOKS.get(key.name, 'text');
if (!raw) continue;
const record = await validateAndParseWebhook(raw);
if (!record) continue;
webhooks.push({
token: key.name,
description: record.description,
created_at: record.created_at,
});
}
return c.json({ webhooks, total: webhooks.length });
});
+73
View File
@@ -0,0 +1,73 @@
import { Hono } from 'hono';
import type { Bindings } from '../types';
import { generateToken, validateAndParseWebhook, executeWebhookGraph } from '../actions/webhook-handlers';
import { resolveWebhookGraph } from '../actions/webhook-graph-resolver';
export const webhooksRouter = new Hono<{ Bindings: Bindings }>();
type WebhookRecord = {
graph: Record<string, unknown>;
description: string;
created_at: string;
};
// POST /webhooks — 接受 graph、triplets 或直接 nodes/edges
webhooksRouter.post('/webhooks', async (c) => {
const body = await c.req.json().catch(() => null);
if (!body) return c.json({ error: 'invalid json' }, 400);
const description = typeof body.description === 'string' ? body.description : '';
const resolved = await resolveWebhookGraph(body as Record<string, unknown>, description, c.env);
if (resolved.error) {
const statusCode = resolved.missingNodes ? 422 : 400;
return c.json(
{ error: resolved.error, ...(resolved.missingNodes && { missing: resolved.missingNodes }) },
statusCode,
);
}
const token = generateToken();
const record: WebhookRecord = {
graph: resolved.resolvedGraph,
description,
created_at: new Date().toISOString(),
};
await c.env.WEBHOOKS.put(token, JSON.stringify(record));
const baseUrl = new URL(c.req.url).origin;
return c.json({
token,
webhook_url: `${baseUrl}/webhooks/${token}/trigger`,
description: record.description,
created_at: record.created_at,
}, 201);
});
// POST /webhooks/:token/trigger — 觸發執行
webhooksRouter.post('/webhooks/:token/trigger', async (c) => {
const token = c.req.param('token');
if (!token || token.length < 16) {
return c.json({ error: 'invalid token' }, 400);
}
const raw = await c.env.WEBHOOKS.get(token, 'text');
if (!raw) return c.json({ error: 'webhook not found' }, 404);
const record = await validateAndParseWebhook(raw);
if (!record) return c.json({ error: 'webhook 定義損毀' }, 500);
let triggerContext: Record<string, unknown> = {};
try {
const body = await c.req.json().catch(() => null);
if (body && typeof body === 'object') {
triggerContext = body as Record<string, unknown>;
}
} catch {
// 無 body 時使用空 context
}
const result = await executeWebhookGraph(c.env, record.graph, triggerContext, token);
return c.json(result, result.success ? 200 : 500);
});
+118
View File
@@ -0,0 +1,118 @@
// arcrun cypher-executor 型別定義
export type Bindings = {
// KV Context Store:節點 output 透過 KV 傳遞,解決同名欄位衝突
EXEC_CONTEXT: KVNamespace;
// Webhook Storekey = workflow namevalue = Workflow JSON
WEBHOOKS: KVNamespace;
// Credential StoreAES-GCM 加密存放用戶 API token
CREDENTIALS_KV: KVNamespace;
// R2 BucketWASM 零件二進位
WASM_BUCKET: R2Bucket;
// Workers AI
AI: Ai;
// 環境變數
ENVIRONMENT: string;
ENCRYPTION_KEY: string; // hex-encoded 256-bit AES keywrangler secret
MULTI_TENANT?: string; // "false" = Self-hosted 單租戶模式,預設 "true"
};
// 圖結構定義
export type GraphNode = {
id: string;
type: 'Input' | 'Component' | 'Output';
componentId?: string;
data?: Record<string, unknown>;
};
export type EdgeType =
| 'PIPE' | 'IF' | 'FOREACH' | 'CONTINUE' // 現有
| 'IS_A' | 'ON_SUCCESS' | 'ON_FAIL' // 執行語意
| 'ON_CLICK' | 'CALLS_SUBFLOW' // 觸發語意
| 'CONTAINS' | 'HAS_STYLE' | 'HAS_BEHAVIOR'; // 結構語意(記錄圖結構,不執行)
export type GraphEdge = {
from: string;
to: string;
type: EdgeType;
condition?: string; // IF 的條件表達式
iterator?: string; // FOREACH 的迭代變數名
};
export type ExecutionGraph = {
id: string;
name: string;
nodes: GraphNode[];
edges: GraphEdge[];
};
// 執行結果
export type ExecutionResult = {
success: boolean;
data: unknown;
trace: TraceStep[];
duration_ms: number;
};
export type TraceStep = {
nodeId: string;
type: string;
input: unknown;
output: unknown;
duration_ms: number;
error?: string;
};
// 零件執行器介面(直接可執行函數,不用動態 eval)
export type ComponentRunner = (context: unknown) => unknown | Promise<unknown>;
// KV Context StoreBUILD-006):節點 output 命名空間前綴
// KV key 格式:{run_id}:node:{node_id} value 是節點 output 的 JSON 字串
// TTL = 3600 秒(1 小時),執行後自動清除
export type KVContextStore = {
runId: string;
kv: KVNamespace;
};
/** 從 KV 讀取節點 output(不存在時回傳 undefined*/
export async function kvGetNodeOutput(store: KVContextStore, nodeId: string): Promise<Record<string, unknown> | undefined> {
try {
const val = await store.kv.get(`${store.runId}:node:${nodeId}`, 'json');
return val as Record<string, unknown> | undefined;
} catch {
return undefined;
}
}
/** 執行失敗時拋出的自訂 Error,攜帶完整 trace 與失敗節點資訊 */
export class ExecutionError extends Error {
readonly failed_node: string;
readonly failed_input: unknown;
readonly trace: TraceStep[];
constructor(
message: string,
failed_node: string,
failed_input: unknown,
trace: TraceStep[],
) {
super(message);
this.name = 'ExecutionError';
this.failed_node = failed_node;
this.failed_input = failed_input;
this.trace = trace;
}
}
/** 將節點 output 寫入 KVTTL 1 小時)*/
export async function kvSetNodeOutput(store: KVContextStore, nodeId: string, output: unknown): Promise<void> {
try {
await store.kv.put(
`${store.runId}:node:${nodeId}`,
JSON.stringify(output),
{ expirationTtl: 3600 },
);
} catch {
// KV 寫入失敗不影響執行(fallback 到記憶體 merge
}
}
+194
View File
@@ -0,0 +1,194 @@
// Cypher Executor 端到端測試
import { SELF } from 'cloudflare:test';
import { describe, it, expect } from 'vitest';
describe('GET /', () => {
it('回傳服務狀態', async () => {
const res = await SELF.fetch('http://localhost/');
const data = await res.json() as Record<string, unknown>;
expect(res.status).toBe(200);
expect(data.service).toBe('arcrun-cypher-executor');
});
});
describe('POST /validate', () => {
it('驗證合法的圖定義', async () => {
const res = await SELF.fetch('http://localhost/validate', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
id: 'test-graph',
name: '測試圖',
nodes: [
{ id: 'n1', type: 'Input' },
{ id: 'n2', type: 'Output' },
],
edges: [
{ from: 'n1', to: 'n2', type: 'PIPE' },
],
}),
});
const data = await res.json() as { valid: boolean };
expect(res.status).toBe(200);
expect(data.valid).toBe(true);
});
it('偵測無效邊', async () => {
const res = await SELF.fetch('http://localhost/validate', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
id: 'bad-graph',
name: '壞圖',
nodes: [{ id: 'n1', type: 'Input' }],
edges: [{ from: 'n1', to: 'n999', type: 'PIPE' }],
}),
});
expect(res.status).toBe(400);
});
});
describe('POST /execute', () => {
it('PIPE 鏈: Input → passthrough → Output', async () => {
const res = await SELF.fetch('http://localhost/execute', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
graph: {
id: 'g1',
name: 'PIPE 測試',
nodes: [
{ id: 'input', type: 'Input', data: { message: 'hello' } },
{ id: 'pass', type: 'Component', componentId: 'comp_passthrough' },
{ id: 'output', type: 'Output' },
],
edges: [
{ from: 'input', to: 'pass', type: 'PIPE' },
{ from: 'pass', to: 'output', type: 'PIPE' },
],
},
context: {},
}),
});
const data = await res.json() as { success: boolean; data: { message: string }; trace: unknown[] };
expect(res.status).toBe(200);
expect(data.success).toBe(true);
expect(data.data.message).toBe('hello');
expect(data.trace.length).toBeGreaterThanOrEqual(3);
});
it('Component 執行: uppercase 轉換', async () => {
const res = await SELF.fetch('http://localhost/execute', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
graph: {
id: 'g2',
name: 'uppercase 測試',
nodes: [
{ id: 'input', type: 'Input', data: { text: 'hello world' } },
{ id: 'upper', type: 'Component', componentId: 'comp_uppercase' },
{ id: 'output', type: 'Output' },
],
edges: [
{ from: 'input', to: 'upper', type: 'PIPE' },
{ from: 'upper', to: 'output', type: 'PIPE' },
],
},
context: {},
}),
});
const data = await res.json() as { success: boolean; data: { text: string } };
expect(data.success).toBe(true);
expect(data.data.text).toBe('HELLO WORLD');
});
it('PIPE 鏈: 多層 counter 累加', async () => {
const res = await SELF.fetch('http://localhost/execute', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
graph: {
id: 'g3',
name: 'counter 測試',
nodes: [
{ id: 'input', type: 'Input', data: { count: 0 } },
{ id: 'c1', type: 'Component', componentId: 'comp_counter' },
{ id: 'c2', type: 'Component', componentId: 'comp_counter' },
{ id: 'c3', type: 'Component', componentId: 'comp_counter' },
{ id: 'output', type: 'Output' },
],
edges: [
{ from: 'input', to: 'c1', type: 'PIPE' },
{ from: 'c1', to: 'c2', type: 'PIPE' },
{ from: 'c2', to: 'c3', type: 'PIPE' },
{ from: 'c3', to: 'output', type: 'PIPE' },
],
},
context: {},
}),
});
const data = await res.json() as { success: boolean; data: { count: number } };
expect(data.success).toBe(true);
expect(data.data.count).toBe(3);
});
it('IF 條件分支', async () => {
const res = await SELF.fetch('http://localhost/execute', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
graph: {
id: 'g4',
name: 'IF 測試',
nodes: [
{ id: 'input', type: 'Input', data: { valid: true, text: 'go' } },
{ id: 'upper', type: 'Component', componentId: 'comp_uppercase' },
{ id: 'output', type: 'Output' },
],
edges: [
{ from: 'input', to: 'upper', type: 'IF', condition: 'result.valid === true' },
{ from: 'upper', to: 'output', type: 'PIPE' },
],
},
context: {},
}),
});
const data = await res.json() as { success: boolean; data: { text: string } };
expect(data.success).toBe(true);
expect(data.data.text).toBe('GO');
});
it('不存在的零件回傳失敗', async () => {
const res = await SELF.fetch('http://localhost/execute', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
graph: {
id: 'g5',
name: '失敗測試',
nodes: [
{ id: 'input', type: 'Input', data: {} },
{ id: 'bad', type: 'Component', componentId: 'comp_not_exist' },
],
edges: [
{ from: 'input', to: 'bad', type: 'PIPE' },
],
},
context: {},
}),
});
const data = await res.json() as { success: boolean; error: string };
expect(data.success).toBe(false);
expect(data.error).toContain('不存在');
});
it('缺少必填欄位回傳 400', async () => {
const res = await SELF.fetch('http://localhost/execute', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ graph: { id: 'x' } }),
});
expect(res.status).toBe(400);
});
});
+215
View File
@@ -0,0 +1,215 @@
/**
* WASI shim 單元測試
* Task 2.2 — Requirements: 3.1, 3.3
*/
import { describe, it, expect } from 'vitest';
import { createWasiShim } from '../src/lib/wasi-shim';
// 建立一個最小的 fake WebAssembly.Memory(用 ArrayBuffer 模擬)
function makeFakeMemory(size = 65536): WebAssembly.Memory {
// 用真實的 WebAssembly.MemoryVitest 環境支援)
return new WebAssembly.Memory({ initial: 1 });
}
/** 在 memory 中寫入 iovec 陣列,回傳 iovs 指標 */
function writeIovecs(
view: DataView,
iovecs: Array<{ buf: number; buf_len: number }>,
startPtr: number,
): number {
for (let i = 0; i < iovecs.length; i++) {
view.setUint32(startPtr + i * 8, iovecs[i].buf, true);
view.setUint32(startPtr + i * 8 + 4, iovecs[i].buf_len, true);
}
return startPtr;
}
describe('createWasiShim', () => {
describe('fd_readstdin', () => {
it('一次讀取完整 stdin', () => {
const input = '{"key":"value"}';
const shim = createWasiShim(input);
const mem = makeFakeMemory();
shim.setMemory(mem);
const view = new DataView(mem.buffer);
const inputBytes = new TextEncoder().encode(input);
// 配置 buffer 區域(offset 100)和 iovecoffset 0
const bufPtr = 100;
const iovsPtr = 0;
const nreadPtr = 50;
writeIovecs(view, [{ buf: bufPtr, buf_len: inputBytes.length }], iovsPtr);
const fd_read = (shim.imports.wasi_snapshot_preview1 as Record<string, Function>).fd_read;
const result = fd_read(0, iovsPtr, 1, nreadPtr);
expect(result).toBe(0); // ESUCCESS
const nread = view.getUint32(nreadPtr, true);
expect(nread).toBe(inputBytes.length);
// 驗證讀取的內容
const readBytes = new Uint8Array(mem.buffer, bufPtr, nread);
expect(new TextDecoder().decode(readBytes)).toBe(input);
});
it('分多次讀取 stdin', () => {
const input = 'hello';
const shim = createWasiShim(input);
const mem = makeFakeMemory();
shim.setMemory(mem);
const view = new DataView(mem.buffer);
const fd_read = (shim.imports.wasi_snapshot_preview1 as Record<string, Function>).fd_read;
// 第一次讀 3 bytes
writeIovecs(view, [{ buf: 200, buf_len: 3 }], 0);
fd_read(0, 0, 1, 50);
expect(view.getUint32(50, true)).toBe(3);
expect(new TextDecoder().decode(new Uint8Array(mem.buffer, 200, 3))).toBe('hel');
// 第二次讀剩餘 2 bytes
writeIovecs(view, [{ buf: 300, buf_len: 10 }], 0);
fd_read(0, 0, 1, 50);
expect(view.getUint32(50, true)).toBe(2);
expect(new TextDecoder().decode(new Uint8Array(mem.buffer, 300, 2))).toBe('lo');
// 第三次讀:stdin 已耗盡,nread = 0
writeIovecs(view, [{ buf: 400, buf_len: 10 }], 0);
fd_read(0, 0, 1, 50);
expect(view.getUint32(50, true)).toBe(0);
});
it('非 stdin fd 回傳 ENOSYS', () => {
const shim = createWasiShim('');
const mem = makeFakeMemory();
shim.setMemory(mem);
const view = new DataView(mem.buffer);
writeIovecs(view, [{ buf: 100, buf_len: 10 }], 0);
const fd_read = (shim.imports.wasi_snapshot_preview1 as Record<string, Function>).fd_read;
expect(fd_read(1, 0, 1, 50)).toBe(76); // ENOSYS
expect(fd_read(2, 0, 1, 50)).toBe(76);
});
});
describe('fd_writestdout/stderr', () => {
it('寫入 stdoutfd=1)並可透過 getStdout 讀取', () => {
const shim = createWasiShim('');
const mem = makeFakeMemory();
shim.setMemory(mem);
const view = new DataView(mem.buffer);
const data = new TextEncoder().encode('{"valid":true}');
const bufPtr = 100;
new Uint8Array(mem.buffer).set(data, bufPtr);
writeIovecs(view, [{ buf: bufPtr, buf_len: data.length }], 0);
const fd_write = (shim.imports.wasi_snapshot_preview1 as Record<string, Function>).fd_write;
const result = fd_write(1, 0, 1, 50);
expect(result).toBe(0);
expect(view.getUint32(50, true)).toBe(data.length);
expect(shim.getStdout()).toBe('{"valid":true}');
});
it('寫入 stderrfd=2)並可透過 getStderr 讀取', () => {
const shim = createWasiShim('');
const mem = makeFakeMemory();
shim.setMemory(mem);
const view = new DataView(mem.buffer);
const data = new TextEncoder().encode('error message');
const bufPtr = 100;
new Uint8Array(mem.buffer).set(data, bufPtr);
writeIovecs(view, [{ buf: bufPtr, buf_len: data.length }], 0);
const fd_write = (shim.imports.wasi_snapshot_preview1 as Record<string, Function>).fd_write;
fd_write(2, 0, 1, 50);
expect(shim.getStderr()).toBe('error message');
expect(shim.getStdout()).toBe(''); // stdout 不受影響
});
it('多次寫入 stdout 會合併', () => {
const shim = createWasiShim('');
const mem = makeFakeMemory();
shim.setMemory(mem);
const view = new DataView(mem.buffer);
const fd_write = (shim.imports.wasi_snapshot_preview1 as Record<string, Function>).fd_write;
const write = (text: string, bufPtr: number) => {
const data = new TextEncoder().encode(text);
new Uint8Array(mem.buffer).set(data, bufPtr);
writeIovecs(view, [{ buf: bufPtr, buf_len: data.length }], 0);
fd_write(1, 0, 1, 50);
};
write('{"valid":', 100);
write('true}', 200);
expect(shim.getStdout()).toBe('{"valid":true}');
});
it('非 stdout/stderr fd 回傳 ENOSYS', () => {
const shim = createWasiShim('');
const mem = makeFakeMemory();
shim.setMemory(mem);
const view = new DataView(mem.buffer);
writeIovecs(view, [{ buf: 100, buf_len: 5 }], 0);
const fd_write = (shim.imports.wasi_snapshot_preview1 as Record<string, Function>).fd_write;
expect(fd_write(0, 0, 1, 50)).toBe(76); // stdin 不能寫
expect(fd_write(3, 0, 1, 50)).toBe(76); // 其他 fd
});
});
describe('proc_exit', () => {
it('proc_exit(0) 拋出 Error(正常結束)', () => {
const shim = createWasiShim('');
const proc_exit = (shim.imports.wasi_snapshot_preview1 as Record<string, Function>).proc_exit;
expect(() => proc_exit(0)).toThrow('wasm exit: 0');
});
it('proc_exit(1) 拋出 Error(錯誤結束)', () => {
const shim = createWasiShim('');
const proc_exit = (shim.imports.wasi_snapshot_preview1 as Record<string, Function>).proc_exit;
expect(() => proc_exit(1)).toThrow('wasm exit: 1');
});
});
describe('其餘 syscall 回傳 ENOSYS', () => {
it('fd_seek 回傳 ENOSYS', () => {
const shim = createWasiShim('');
const fd_seek = (shim.imports.wasi_snapshot_preview1 as Record<string, Function>).fd_seek;
expect(fd_seek(1, 0, 0, 0)).toBe(76);
});
it('sock_connect 相關 syscall 回傳 ENOSYS', () => {
const shim = createWasiShim('');
const imports = shim.imports.wasi_snapshot_preview1 as Record<string, Function>;
expect(imports.sock_recv()).toBe(76);
expect(imports.sock_send()).toBe(76);
expect(imports.sock_shutdown()).toBe(76);
});
it('path_open 回傳 ENOSYS', () => {
const shim = createWasiShim('');
const imports = shim.imports.wasi_snapshot_preview1 as Record<string, Function>;
expect(imports.path_open()).toBe(76);
expect(imports.path_create_directory()).toBe(76);
});
});
describe('setMemory 未呼叫時', () => {
it('fd_write 在 memory 未設定時拋出錯誤', () => {
const shim = createWasiShim('');
// 不呼叫 setMemory
const fd_write = (shim.imports.wasi_snapshot_preview1 as Record<string, Function>).fd_write;
expect(() => fd_write(1, 0, 1, 50)).toThrow('WASI memory not set');
});
});
});
+16
View File
@@ -0,0 +1,16 @@
{
"compilerOptions": {
"target": "ESNext",
"module": "ESNext",
"moduleResolution": "Bundler",
"lib": ["ESNext"],
"types": ["@cloudflare/workers-types/2023-07-01", "@cloudflare/vitest-pool-workers"],
"strict": true,
"skipLibCheck": true,
"noEmit": true,
"isolatedModules": true,
"resolveJsonModule": true,
"esModuleInterop": true
},
"include": ["src/**/*.ts", "tests/**/*.ts"]
}
+11
View File
@@ -0,0 +1,11 @@
import { defineWorkersConfig } from '@cloudflare/vitest-pool-workers/config';
export default defineWorkersConfig({
test: {
poolOptions: {
workers: {
wrangler: { configPath: './wrangler.test.toml' },
},
},
},
});
+23
View File
@@ -0,0 +1,23 @@
name = "arcrun-cypher-executor"
main = "src/index.ts"
compatibility_date = "2025-02-19"
compatibility_flags = ["nodejs_compat"]
# 測試環境不啟用 Service BindingMiniflare 無法解析外部服務)
# R2 mockWASM 執行器測試用)
[[r2_buckets]]
binding = "WASM_BUCKET"
bucket_name = "arcrun-wasm"
# KV mockBUILD-006
[[kv_namespaces]]
binding = "EXEC_CONTEXT"
id = "test-exec-context"
[[kv_namespaces]]
binding = "WEBHOOKS"
id = "test-webhooks"
[vars]
ENVIRONMENT = "test"
+35
View File
@@ -0,0 +1,35 @@
name = "arcrun-cypher-executor"
main = "src/index.ts"
compatibility_date = "2025-02-19"
compatibility_flags = ["nodejs_compat"]
# KV Context Store:節點 output 透過 KV 傳遞,解決同名欄位衝突
# TTL 設為 1 小時,執行完後自動清除
[[kv_namespaces]]
binding = "EXEC_CONTEXT"
id = "" # 填入你的 KV Namespace ID
# Webhook Store:儲存 Workflow 定義,key = workflow name
[[kv_namespaces]]
binding = "WEBHOOKS"
id = "" # 填入你的 KV Namespace ID
# Credential StoreAES-GCM 加密存放用戶 API token
# Standard 模式:供 credential-injector 讀取加密 token
[[kv_namespaces]]
binding = "CREDENTIALS_KV"
id = "" # 填入你的 Credentials KV Namespace ID
# R2 BucketWASM 零件二進位(arcrun.dev 公眾零件庫,或自架時填入自己的 bucket)
[[r2_buckets]]
binding = "WASM_BUCKET"
bucket_name = "arcrun-wasm"
# Workers AI
[ai]
binding = "AI"
[vars]
ENVIRONMENT = "production"
# MULTI_TENANT = "true" # Standard 模式(預設);設 "false" 啟用 Self-hosted 單租戶模式
# ENCRYPTION_KEY 透過 wrangler secret 設定(hex-encoded 256-bit AES key