feat: component execution via Worker fetch + API recipes

- Logic components (15): each deployed as Worker at {name}.arcrun.dev,
  cypher-executor fetches them via HTTP POST
- API components (6): gmail, telegram, line_notify, google_sheets,
  http_request, cron executed inline via fetch recipes in component-loader
- External URL support: any https:// componentId is fetched directly
  (n8n webhooks, MCP endpoints, etc.)
- Add deploy-logic-components.sh script for building/deploying WASM Workers
- Add component-worker-template with inline WASI shim

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-16 16:59:12 +08:00
parent 5534d60b60
commit 2b89ea8825
5 changed files with 451 additions and 59 deletions
+177 -59
View File
@@ -1,75 +1,193 @@
/**
* arcrun component loader
*
* 三種執行模式:
*
* 1. 邏輯零件(category=logic
* → fetch POST https://{name-with-dashes}.arcrun.dev
* 每個邏輯零件是獨立 CF Worker,有 WASM 靜態 bundle
*
* 2. API recipe 零件(category=api
* → 從 CREDENTIALS_KV 讀取 recipefetch 外部 API
* 不需要獨立 Worker,整個執行在 cypher-executor 裡完成
*
* 3. 外部 URL 零件(componentId 以 http:// 或 https:// 開頭)
* → 直接 fetch,可以是 n8n webhook、MCP endpoint 等任何 HTTP 服務
*
* 4. 內建零件(BUILTIN_COMPONENTS
* → 純 JS 函數,不需要網路呼叫
*/
import { BUILTIN_COMPONENTS } from './constants';
import { createWasiShim } from './wasi-shim';
import type { Bindings, ComponentRunner } from '../types';
// Worker 記憶體快取:componentId → WebAssembly.Module(跨請求共享,避免重複編譯)
const moduleCache = new Map<string, WebAssembly.Module>();
/** 邏輯零件 canonical_id → Worker URL */
const LOGIC_COMPONENT_URLS: Record<string, string> = {
if_control: 'https://if-control.arcrun.dev',
switch: 'https://switch.arcrun.dev',
foreach_control: 'https://foreach-control.arcrun.dev',
filter: 'https://filter.arcrun.dev',
merge: 'https://merge.arcrun.dev',
try_catch: 'https://try-catch.arcrun.dev',
wait: 'https://wait.arcrun.dev',
set: 'https://set.arcrun.dev',
array_ops: 'https://array-ops.arcrun.dev',
string_ops: 'https://string-ops.arcrun.dev',
number_ops: 'https://number-ops.arcrun.dev',
date_ops: 'https://date-ops.arcrun.dev',
validate_json: 'https://validate-json.arcrun.dev',
ai_transform_compile:'https://ai-transform-compile.arcrun.dev',
ai_transform_run: 'https://ai-transform-run.arcrun.dev',
};
/** API 零件 canonical_id → recipeendpoint + 組裝邏輯)*/
const API_RECIPES: Record<string, (ctx: Record<string, unknown>) => Promise<unknown>> = {
http_request: async (ctx) => {
const url = ctx.url as string;
const method = (ctx.method as string ?? 'GET').toUpperCase();
const headers = (ctx.headers as Record<string, string>) ?? {};
const body = ctx.body !== undefined ? JSON.stringify(ctx.body) : undefined;
if (!url) return { success: false, error: 'url 必填' };
const res = await fetch(url, { method, headers, body });
const text = await res.text();
let data: unknown = text;
try { data = JSON.parse(text); } catch { /* keep as text */ }
return { success: res.ok, status: res.status, data };
},
gmail: async (ctx) => {
const { to, subject, body, access_token } = ctx as Record<string, string>;
if (!access_token) return { success: false, error: 'access_token 必填(由 credentials 注入)' };
if (!to || !subject || !body) return { success: false, error: 'to, subject, body 必填' };
// Build RFC 2822 message + base64url encode
const message = `To: ${to}\r\nSubject: ${subject}\r\nContent-Type: text/plain; charset=utf-8\r\n\r\n${body}`;
const encoded = btoa(unescape(encodeURIComponent(message)))
.replace(/\+/g, '-').replace(/\//g, '_').replace(/=+$/, '');
const res = await fetch('https://gmail.googleapis.com/gmail/v1/users/me/messages/send', {
method: 'POST',
headers: { 'Authorization': `Bearer ${access_token}`, 'Content-Type': 'application/json' },
body: JSON.stringify({ raw: encoded }),
});
const data = await res.json();
return { success: res.ok, data };
},
telegram: async (ctx) => {
const { bot_token, chat_id, text } = ctx as Record<string, string>;
if (!bot_token) return { success: false, error: 'bot_token 必填(由 credentials 注入)' };
const res = await fetch(`https://api.telegram.org/bot${bot_token}/sendMessage`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ chat_id, text }),
});
const data = await res.json();
return { success: res.ok, data };
},
line_notify: async (ctx) => {
const { token, message } = ctx as Record<string, string>;
if (!token) return { success: false, error: 'token 必填(由 credentials 注入)' };
const form = new URLSearchParams({ message });
const res = await fetch('https://notify-api.line.me/api/notify', {
method: 'POST',
headers: { 'Authorization': `Bearer ${token}`, 'Content-Type': 'application/x-www-form-urlencoded' },
body: form.toString(),
});
const data = await res.json();
return { success: res.ok, data };
},
google_sheets: async (ctx) => {
const { access_token, spreadsheet_id, range, values, operation } = ctx as Record<string, unknown>;
if (!access_token) return { success: false, error: 'access_token 必填(由 credentials 注入)' };
const headers = { 'Authorization': `Bearer ${access_token}`, 'Content-Type': 'application/json' };
const op = (operation as string) ?? 'read';
if (op === 'read') {
const res = await fetch(
`https://sheets.googleapis.com/v4/spreadsheets/${spreadsheet_id}/values/${range}`,
{ headers }
);
const data = await res.json();
return { success: res.ok, data };
} else {
const res = await fetch(
`https://sheets.googleapis.com/v4/spreadsheets/${spreadsheet_id}/values/${range}:append?valueInputOption=USER_ENTERED`,
{ method: 'POST', headers, body: JSON.stringify({ values }) }
);
const data = await res.json();
return { success: res.ok, data };
}
},
cron: async (ctx) => {
// cron 是觸發源,在 workflow 執行時已被觸發,直接 passthrough
return { success: true, data: ctx };
},
ai_transform_compile: async (ctx) => {
// fallback — 通常由 logic Worker 處理,這裡是保險
return { success: true, data: ctx };
},
ai_transform_run: async (ctx) => {
return { success: true, data: ctx };
},
};
/**
* 建立零件載入器
*
* 三層優先序:
* 1. 內建零件(BUILTIN_COMPONENTS,純本地轉換,不需 R2
* 2. WASM_BUCKET R2 直讀 → {componentId}/{componentId}.wasm
* 3. 找不到 → 結構化錯誤(含 R2 key 與修復說明)
*/
export function createComponentLoader(env: Bindings) {
return async (componentId: string): Promise<ComponentRunner> => {
// 層 1:內建零件(無需 R2
// 1. 內建零件(純 JS,最優先)
const builtin = BUILTIN_COMPONENTS.get(componentId);
if (builtin) return builtin;
// 層 2:從 WASM_BUCKET R2 讀取(快取 Module 避免重複編譯
const wasmKey = `${componentId}/${componentId}.wasm`;
let wasmModule = moduleCache.get(componentId);
if (!wasmModule) {
const wasmObj = await env.WASM_BUCKET.get(wasmKey);
if (!wasmObj) {
throw new Error(
`零件 ${componentId} 不存在。\n` +
`請確認 ${wasmKey} 已上傳至 WASM_BUCKET。\n` +
`修復:執行 acr parts 查看可用零件清單。`
);
}
const buffer = await wasmObj.arrayBuffer();
wasmModule = await WebAssembly.compile(buffer);
moduleCache.set(componentId, wasmModule);
// 2. 外部 URLcomponentId 直接是 http/https URL
if (componentId.startsWith('http://') || componentId.startsWith('https://')) {
return async (ctx: unknown) => {
const res = await fetch(componentId, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(ctx),
});
if (!res.ok) {
const text = await res.text();
return { success: false, status: res.status, error: text };
}
try { return await res.json(); }
catch { return { success: true, data: await res.text() }; }
};
}
const compiledModule = wasmModule;
return async (ctx: unknown): Promise<unknown> => {
const stdinJson = JSON.stringify(ctx);
const shim = createWasiShim(stdinJson);
const instance = await WebAssembly.instantiate(compiledModule, shim.imports);
const memory = instance.exports.memory as WebAssembly.Memory | undefined;
if (memory) shim.setMemory(memory);
const exports = instance.exports as Record<string, unknown>;
const entryFn = (exports._start ?? exports.main) as (() => void) | undefined;
if (typeof entryFn !== 'function') {
throw new Error(`WASM 零件缺少 _start 或 main export${componentId}`);
}
try {
entryFn();
} catch (e) {
// proc_exit(0) 拋出 "wasm exit: 0",視為正常結束
if (!(e instanceof Error && e.message === 'wasm exit: 0')) {
throw e;
// 3. 邏輯零件 Worker
const logicUrl = LOGIC_COMPONENT_URLS[componentId];
if (logicUrl) {
return async (ctx: unknown) => {
const res = await fetch(logicUrl, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(ctx),
});
if (!res.ok) {
const text = await res.text();
return { success: false, error: `${componentId} Worker 回傳 ${res.status}: ${text.slice(0, 200)}` };
}
}
try { return await res.json(); }
catch { return { success: false, error: `${componentId} Worker 回傳非 JSON` }; }
};
}
const stdout = shim.getStdout().trim();
if (!stdout) throw new Error(`WASM 零件沒有輸出(stdout 為空):${componentId}`);
// 4. API recipe 零件
const recipe = API_RECIPES[componentId];
if (recipe) {
return async (ctx: unknown) => recipe(ctx as Record<string, unknown>);
}
try {
return JSON.parse(stdout);
} catch {
throw new Error(`WASM 零件輸出不是合法 JSON${stdout.slice(0, 200)}`);
}
};
// 5. 找不到
throw new Error(
`找不到零件 "${componentId}"。\n` +
`可用邏輯零件:${Object.keys(LOGIC_COMPONENT_URLS).join(', ')}\n` +
`可用 API 零件:${Object.keys(API_RECIPES).join(', ')}\n` +
`也可傳入外部 URLhttps://...)作為零件。`
);
};
}