7b18387113
- /cypher/execute now accepts separate `config` field:
{node_name: {component: "cmp_19e62efd", ...staticParams}}
- graph-builder reads config[node].component to override componentId
(supports cmp_ hash, rec_ hash, or canonical_id)
- config[node] other fields become node.data (static params merged at runtime)
- acr run now sends workflow.config as separate `config` (not flattened into context)
- context is now only --input dynamic params
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
114 lines
4.1 KiB
TypeScript
114 lines
4.1 KiB
TypeScript
import type { Bindings, ExecutionGraph } from '../types';
|
||
import { ExecutionError } from '../types';
|
||
import { GraphExecutor } from '../graph-executor';
|
||
import { graphSchema } from '../lib/schemas';
|
||
import { createComponentLoader } from '../lib/component-loader';
|
||
import { writeEvaluation, updateComponentStats } from './execution-evaluator';
|
||
import { parseTriplets } from './triplet-parser';
|
||
import { searchNodes } from './search-nodes';
|
||
import { buildExecutionGraph } from './graph-builder';
|
||
|
||
export async function handleCypherSearch(
|
||
triplets: unknown[],
|
||
env: Bindings,
|
||
): Promise<{ nodes: Record<string, unknown>; cypher: unknown; missing: string[] }> {
|
||
const parsed = parseTriplets(triplets);
|
||
if (!parsed) {
|
||
throw new Error('無法解析任何節點');
|
||
}
|
||
|
||
const { nodeResults, missingNodes } = await searchNodes(parsed, env.WASM_BUCKET);
|
||
|
||
if (missingNodes.length > 0) {
|
||
return { nodes: nodeResults, cypher: null, missing: missingNodes };
|
||
}
|
||
|
||
const graph = buildExecutionGraph(parsed, nodeResults, 'cypher-search-result', 'Cypher Search Result');
|
||
return { nodes: nodeResults, cypher: { nodes: graph.nodes, edges: graph.edges }, missing: [] };
|
||
}
|
||
|
||
export async function handleCypherExecute(
|
||
triplets: unknown[],
|
||
context: Record<string, unknown> | undefined,
|
||
graphId: string,
|
||
graphName: string,
|
||
config: Record<string, Record<string, unknown>> | undefined,
|
||
env: Bindings,
|
||
waitUntil: (promise: Promise<void>) => void,
|
||
): Promise<{ success: boolean; data?: unknown; error?: string; trace?: unknown; duration_ms: number; graph?: ExecutionGraph }> {
|
||
const parsed = parseTriplets(triplets as unknown[]);
|
||
if (!parsed) {
|
||
throw new Error('無法解析任何節點');
|
||
}
|
||
|
||
const { nodeResults, missingNodes } = await searchNodes(parsed, env.WASM_BUCKET);
|
||
|
||
if (missingNodes.length > 0) {
|
||
throw new Error(
|
||
`以下零件不存在於 WASM_BUCKET:${missingNodes.join(', ')}\n` +
|
||
`修復:執行 acr parts 查看可用零件清單,或執行 acr validate <workflow.yaml> 進行完整驗證。`
|
||
);
|
||
}
|
||
|
||
const graph = buildExecutionGraph(parsed, nodeResults, graphId, graphName, config);
|
||
const parseResult = graphSchema.safeParse(graph);
|
||
if (!parseResult.success) {
|
||
throw new Error('圖定義產生失敗');
|
||
}
|
||
|
||
const loader = createComponentLoader(env);
|
||
const executor = new GraphExecutor(loader, undefined, env);
|
||
const start = Date.now();
|
||
|
||
try {
|
||
const result = await executor.execute(parseResult.data as ExecutionGraph, context ?? {}, env.EXEC_CONTEXT);
|
||
const duration_ms = Date.now() - start;
|
||
|
||
// 非同步記錄統計(Phase 7 補充 analytics,目前為 no-op)
|
||
const componentId = graph.nodes.find(n => n.componentId)?.componentId ?? graphId;
|
||
const runId = `${graphId}-${Date.now()}`;
|
||
waitUntil(writeEvaluation(env, {
|
||
run_id: runId,
|
||
workflow_id: graphId,
|
||
component_id: componentId,
|
||
verdict: 'success',
|
||
duration_ms,
|
||
evaluated_at: Date.now(),
|
||
}));
|
||
waitUntil(updateComponentStats(env, componentId, 'success', duration_ms));
|
||
|
||
return { success: true, data: result.data, trace: result.trace, duration_ms, graph };
|
||
} catch (err) {
|
||
const duration_ms = Date.now() - start;
|
||
const errMsg = err instanceof Error ? err.message : String(err);
|
||
const componentId = graph.nodes.find(n => n.componentId)?.componentId ?? graphId;
|
||
const runId = `${graphId}-${Date.now()}`;
|
||
waitUntil(writeEvaluation(env, {
|
||
run_id: runId,
|
||
workflow_id: graphId,
|
||
component_id: componentId,
|
||
verdict: 'failed',
|
||
duration_ms,
|
||
error_message: errMsg.slice(0, 200),
|
||
evaluated_at: Date.now(),
|
||
}));
|
||
waitUntil(updateComponentStats(env, componentId, 'failed', duration_ms));
|
||
if (err instanceof ExecutionError) {
|
||
const traceFormatted = err.trace.map(s => ({
|
||
node: s.nodeId,
|
||
status: s.error ? 'failed' : 'success',
|
||
...(s.error ? { error: s.error } : {}),
|
||
}));
|
||
throw new Error(JSON.stringify({
|
||
success: false,
|
||
error: errMsg,
|
||
failed_node: err.failed_node,
|
||
failed_input: err.failed_input,
|
||
trace: traceFormatted,
|
||
duration_ms,
|
||
}));
|
||
}
|
||
throw err;
|
||
}
|
||
}
|