From 27f7448914fbe317b2fce6626940caab5854a6bb Mon Sep 17 00:00:00 2001 From: richblack Date: Fri, 26 Jun 2026 18:13:49 +0800 Subject: [PATCH] =?UTF-8?q?feat(ingest):=20POST=20/triplets/ingest=20?= =?UTF-8?q?=E5=AF=AB=E5=85=A5=E7=AB=AF=20+=20deprecate-then-append=20(T3.2?= =?UTF-8?q?-3.5)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 對應 issue #1 T3 B 段。 - templates: TRIPLET_SLOTS 加 status/superseded_by/source_uri/content_hash; ENTITY_SLOTS 加 gloss;recordToTriplet 映射新欄位(缺省 status=active 相容舊資料) - kbdb-client: ensureTemplate 改 slot-diff 補丁(既有 template 走 PATCH /templates/:id 補缺 slot,取代 early-return → 免遷移腳本);新增 updateRecord(PATCH /records/:id) - triplet-ingest action(88 行純函式):Zod strict 鏡射 ingest-candidate 契約 → idempotency(uri+hash 同→no-op)→ 先 append 後 deprecate(無「全無 active」空窗) - POST /triplets/ingest route:strict 驗證失敗 → 422(禁送 graph 領域欄位) - queryTriplets 預設 active-only(traverse/search/neighbors 皆經此), includeDeprecated opt-out 供 rollback/考古 - 6 測試案全綠(vitest 16 passed);mock-client 同步 slot-diff + updateRecord gates: zero SQL / zero migration / 無 D1·Vectorize·AI 綁定 / dry-run bundle 乾淨 Co-Authored-By: Claude Opus 4.8 (1M context) --- docs/3-specs/ingest-contract/tasks.md | 14 ++-- src/actions/triplet-crud.ts | 9 +++ src/actions/triplet-ingest.ts | 82 +++++++++++++++++++++ src/lib/kbdb-client.ts | 24 +++++- src/lib/templates.ts | 11 ++- src/routes/triplets.ts | 28 +++++++ src/types.ts | 6 ++ tests/mock-client.ts | 16 +++- tests/triplet-ingest.test.ts | 102 ++++++++++++++++++++++++++ 9 files changed, 279 insertions(+), 13 deletions(-) create mode 100644 src/actions/triplet-ingest.ts create mode 100644 tests/triplet-ingest.test.ts diff --git a/docs/3-specs/ingest-contract/tasks.md b/docs/3-specs/ingest-contract/tasks.md index 3432f4d..6f920dc 100644 --- a/docs/3-specs/ingest-contract/tasks.md +++ b/docs/3-specs/ingest-contract/tasks.md @@ -6,17 +6,17 @@ ## A. 契約 + template slot - [x] **3.1** 搬 `contracts/ingest-candidate.json` 進本 repo + `contracts/README.md` 標明候選≠已存(2026-06-26) -- [ ] **3.2** `ensureTemplate` 改 slot-diff 補丁(命中既有 → base `PATCH /templates/:id` 補缺 slot,不再 early-return);`TRIPLET_SLOTS` 加 `status` + `superseded_by` + `source_uri` + `content_hash` -- [ ] **3.2b** `ENTITY_SLOTS` 加 `gloss`(已核實現無) +- [x] **3.2** `ensureTemplate` 改 slot-diff 補丁(命中既有 → base `PATCH /templates/:id` 補缺 slot,不再 early-return);`TRIPLET_SLOTS` 加 `status`+`superseded_by`+`source_uri`+`content_hash`(2026-06-26,`kbdb-client.ts`+`templates.ts`) +- [x] **3.2b** `ENTITY_SLOTS` 加 `gloss`(已核實現無)(2026-06-26) - [ ] **3.2c** normalize 分層 fallback 接口:exact-only 先做;semantic 留接口(待 base embed,Arcrun #7) ## B. 寫入端 + 取代(核心) -- [ ] **3.3a** `KbdbClient.updateRecord(id, values)` → base `PATCH /records/:id`(已核實現無) -- [ ] **3.3b** `src/actions/triplet-ingest.ts`:驗證 envelope(422 擋禁送欄位)→ idempotency(uri+hash)→ deprecate-then-append(先 append 後翻舊批 status)。<100 行純函式 -- [ ] **3.3c** `POST /triplets/ingest` route(只驗證 + 呼叫 action) -- [ ] **3.4** 測試(mock,不打真網路):正常 envelope / 同 hash no-op / 新 hash deprecate / 污染 envelope(帶 bridge_score) 422 / rollback(翻回 status) -- [ ] **3.5** 查詢 active-only:traverse/search/neighbors 組圖前 filter `status==='active'`(缺省視為 active,相容舊資料) +- [x] **3.3a** `KbdbClient.updateRecord(id, values)` → base `PATCH /records/:id`(2026-06-26,mock 同步) +- [x] **3.3b** `src/actions/triplet-ingest.ts`:Zod strict 驗證 → idempotency(uri+hash)→ **先 append 後 deprecate**。88 行純函式(2026-06-26) +- [x] **3.3c** `POST /triplets/ingest` route(驗證失敗 → 422 hook,只驗證+呼叫 action)(2026-06-26) +- [x] **3.4** 測試 6 案全綠:正常 / 同 hash no-op / 新 hash deprecate / 污染(bridge_score+頂層 id) 422 / rollback(`vitest run` 16 passed)(2026-06-26) +- [x] **3.5** 查詢 active-only:`queryTriplets` 缺省 filter `status==='active'`(traverse/search/neighbors 皆經此;`includeDeprecated` opt-out 供 rollback/考古)(2026-06-26) ## C. MCP(⚠️ 跨 repo,需 arcrun 配合 → issue 標清) diff --git a/src/actions/triplet-crud.ts b/src/actions/triplet-crud.ts index 744cc16..9fa24f0 100644 --- a/src/actions/triplet-crud.ts +++ b/src/actions/triplet-crud.ts @@ -18,6 +18,8 @@ export type CreateTripletData = { bridge_score?: number; subject_entity_type?: string; object_entity_type?: string; + source_uri?: string; + content_hash?: string; }; /** 建立三元組 → POST /records(template=triplet)。 */ @@ -37,10 +39,13 @@ export async function createTriplet( confidence: String(data.confidence ?? 1.0), clusters_json: JSON.stringify(clusters), bridge_score: String(bridgeScore), + status: 'active', }; if (data.source_block_id) values.source_block_id = data.source_block_id; if (data.subject_entity_type) values.subject_entity_type = data.subject_entity_type; if (data.object_entity_type) values.object_entity_type = data.object_entity_type; + if (data.source_uri) values.source_uri = data.source_uri; + if (data.content_hash) values.content_hash = data.content_hash; const id = await client.createRecord(TPL_TRIPLET, values, data.owner_id); return { id, subject: data.subject, predicate: data.predicate, object: data.object }; @@ -54,6 +59,7 @@ export type TripletFilters = { offset?: number; owner_id?: string; entity_type?: string; + includeDeprecated?: boolean; // 預設只回 active;rollback/考古才開(T3.5) }; /** 查三元組 → 取 template 全部 record,插件層 filter(base 無複合 slot 查詢)。 */ @@ -64,6 +70,9 @@ export async function queryTriplets( const records = await client.listRecordsByTemplate(TPL_TRIPLET, filters.owner_id); let triplets = records.map(recordToTriplet); + // active-only:deprecated 不進圖遍歷/查詢(缺省 status 視為 active,相容舊資料)。 + if (!filters.includeDeprecated) triplets = triplets.filter((t) => t.status === 'active'); + if (filters.subject) triplets = triplets.filter((t) => t.subject === filters.subject); if (filters.predicate) triplets = triplets.filter((t) => t.predicate === filters.predicate); if (filters.object) triplets = triplets.filter((t) => t.object === filters.object); diff --git a/src/actions/triplet-ingest.ts b/src/actions/triplet-ingest.ts new file mode 100644 index 0000000..3835bbf --- /dev/null +++ b/src/actions/triplet-ingest.ts @@ -0,0 +1,82 @@ +// ingest 寫入端 — 收 ingest-candidate envelope,做 idempotency + deprecate-then-append。 +// 契約:contracts/ingest-candidate.json。鐵律:走 base API、零 SQL。 +// 取代策略:先 append 新批 active,後翻舊批 status=deprecated(中途失敗不留「全無 active」空窗)。 + +import { z } from '@hono/zod-openapi'; +import type { KbdbClient } from '../lib/kbdb-client'; +import { TPL_TRIPLET, ensurePluginTemplates, recordToTriplet } from '../lib/templates'; +import { createTriplet } from './triplet-crud'; + +// Zod 鏡射契約:strict() = additionalProperties:false → 禁送欄位 422(route 把 ZodError 轉 422)。 +const NodeSchema = z.object({ + name: z.string().min(1), + gloss: z.string().optional(), + entity_type: z.enum(['person', 'event', 'product', 'market', 'org']).optional(), +}).strict(); + +const EdgeSchema = z.object({ + subject: z.string().min(1), + predicate: z.string().min(1), + object: z.string().min(1), + confidence: z.number().min(0).max(1).optional(), +}).strict(); + +export const IngestEnvelopeSchema = z.object({ + source: z.object({ + uri: z.string().min(1), + content_hash: z.string().min(1), + anchor: z.string().optional(), + commit: z.string().optional(), + block_id: z.string().optional(), + }).strict(), + extractor: z.object({ + model: z.string().min(1), + tier: z.enum(['shallow', 'deep']), + extracted_at: z.number().int().optional(), + }).strict(), + nodes: z.array(NodeSchema).optional(), + triplets: z.array(EdgeSchema).min(1), +}).strict(); + +export type IngestEnvelope = z.infer; + +export type IngestResult = { skipped: boolean; ingested: number; deprecated: number }; + +/** 收 envelope → idempotency → 先 append 後 deprecate。回 {skipped,ingested,deprecated}。 */ +export async function ingestEnvelope( + client: KbdbClient, + env: IngestEnvelope, + owner_id?: string, +): Promise { + await ensurePluginTemplates(client); + + // 同 source_uri 的現存 active triplet(idempotency 分組 + 待 deprecate 對象)。 + const all = (await client.listRecordsByTemplate(TPL_TRIPLET, owner_id)).map(recordToTriplet); + const priorActive = all.filter((t) => t.source_uri === env.source.uri && t.status === 'active'); + + // 同 hash → no-op(envelope 已落地過)。 + if (priorActive.some((t) => t.content_hash === env.source.content_hash)) { + return { skipped: true, ingested: 0, deprecated: 0 }; + } + + // 1) 先 append 新批 active。 + for (const e of env.triplets) { + await createTriplet(client, { + subject: e.subject, + predicate: e.predicate, + object: e.object, + confidence: e.confidence, + source_block_id: env.source.block_id, + source_uri: env.source.uri, + content_hash: env.source.content_hash, + owner_id, + }); + } + + // 2) 後翻舊批 status=deprecated(指向本批 source_uri;append 在前 → 無空窗)。 + for (const old of priorActive) { + await client.updateRecord(old.id, { status: 'deprecated', superseded_by: env.source.content_hash }); + } + + return { skipped: false, ingested: env.triplets.length, deprecated: priorActive.length }; +} diff --git a/src/lib/kbdb-client.ts b/src/lib/kbdb-client.ts index 4e4dd2f..b61e682 100644 --- a/src/lib/kbdb-client.ts +++ b/src/lib/kbdb-client.ts @@ -113,12 +113,25 @@ export class KbdbClient { // --- templates(= 替代建表;插件要新類型只能建 template) --- async ensureTemplate(name: string, slots: string[], description?: string): Promise { - const existing = await this.req<{ id?: string } | { error: string }>( + const existing = await this.req<{ id?: string; slots?: string[] } | { error: string }>( 'GET', `/templates/${encodeURIComponent(name)}`, ).catch(() => null); - if (existing && (existing as any).id) return; - await this.req('POST', '/templates', { name, slots, description, created_by: 'kbdb-graph' }); + + // 全新 template → 建。 + if (!existing || !(existing as any).id) { + await this.req('POST', '/templates', { name, slots, description, created_by: 'kbdb-graph' }); + return; + } + + // 既有 template → 補缺 slot(不 early-return;否則 seed 後新增的 slot 永遠進不來)。 + // 走 base PATCH /templates/:id 增 slot;既有環境免另跑遷移腳本即收斂。 + const have = new Set((existing as any).slots ?? []); + const missing = slots.filter((s) => !have.has(s)); + if (missing.length === 0) return; + await this.req('PATCH', `/templates/${encodeURIComponent((existing as any).id)}`, { + slots: [...have, ...missing], + }); } // --- records(= template 實例,填 slot) --- @@ -141,6 +154,11 @@ export class KbdbClient { } } + /** 翻 record 的 slot 值(base PATCH /records/:id)。deprecate(翻 status)與 rollback 都靠它。 */ + async updateRecord(recordId: string, values: Record): Promise { + await this.req('PATCH', `/records/${encodeURIComponent(recordId)}`, { values }); + } + async listRecordsByTemplate(template: string, owner_id?: string): Promise { const { records } = await this.req<{ records: BaseRecord[] }>( 'GET', diff --git a/src/lib/templates.ts b/src/lib/templates.ts index 42b5b2d..edec1a0 100644 --- a/src/lib/templates.ts +++ b/src/lib/templates.ts @@ -14,8 +14,12 @@ export const TRIPLET_SLOTS = [ 'subject', 'predicate', 'object', 'source_block_id', 'confidence', 'clusters_json', 'bridge_score', 'subject_entity_type', 'object_entity_type', + // 取代/快照(T3.2):status=active|deprecated;superseded_by=取代它的新 record id; + // source_uri+content_hash 承載 ingest idempotency(按 source_uri 分組 deprecate)。 + 'status', 'superseded_by', 'source_uri', 'content_hash', ]; -export const ENTITY_SLOTS = ['canonical', 'aliases_json', 'entity_type', 'owner']; +// gloss(T3.2b):一句話描述,供「詞+gloss」語義 normalize 的 embedding 對象。 +export const ENTITY_SLOTS = ['canonical', 'aliases_json', 'entity_type', 'owner', 'gloss']; export const ENTITY_PENDING_SLOTS = [ 'raw_name', 'candidate_entity_id', 'candidate_canonical', 'similarity', ]; @@ -41,6 +45,11 @@ export function recordToTriplet(rec: BaseRecord): Triplet { bridge_score: parseInt(v.bridge_score ?? '0', 10), subject_entity_type: (v.subject_entity_type as Triplet['subject_entity_type']) || null, object_entity_type: (v.object_entity_type as Triplet['object_entity_type']) || null, + // 缺省視為 active(相容尚無 status slot 的舊資料)。 + status: v.status === 'deprecated' ? 'deprecated' : 'active', + superseded_by: v.superseded_by || null, + source_uri: v.source_uri || null, + content_hash: v.content_hash || null, created_at: 0, updated_at: 0, }; diff --git a/src/routes/triplets.ts b/src/routes/triplets.ts index 1bf5c2c..ecc35f6 100644 --- a/src/routes/triplets.ts +++ b/src/routes/triplets.ts @@ -2,6 +2,7 @@ import { createRoute, OpenAPIHono, z } from '@hono/zod-openapi'; import type { Bindings, Variables } from '../types'; import { createTriplet, queryTriplets } from '../actions/triplet-crud'; import { getTripletStats } from '../actions/triplet-stats'; +import { ingestEnvelope, IngestEnvelopeSchema } from '../actions/triplet-ingest'; import { makeKbdbClient } from '../lib/kbdb-client'; const tripletRoutes = new OpenAPIHono<{ Bindings: Bindings; Variables: Variables }>(); @@ -97,4 +98,31 @@ tripletRoutes.openapi(createRouteDefinition, async (c) => { return c.json({ ok: true }, 201); }); +// POST /triplets/ingest — 收 ingest-candidate envelope(idempotency + deprecate-then-append) +const ingestRoute = createRoute({ + method: 'post', + path: '/ingest', + request: { + body: { content: { 'application/json': { schema: IngestEnvelopeSchema } } }, + }, + responses: { + 200: { description: 'Envelope ingested (or skipped if same content_hash)' }, + 422: { description: 'Invalid envelope (forbidden field or shape mismatch)' }, + }, + tags: ['Triplets'], +}); + +tripletRoutes.openapi( + ingestRoute, + async (c) => { + const env = c.req.valid('json'); + const result = await ingestEnvelope(makeKbdbClient(c.env), env); + return c.json(result, 200); + }, + // strict() 驗證失敗(如送禁止欄位 bridge_score)→ 422,不是預設 400。 + (zres, c) => { + if (!zres.success) return c.json({ error: 'invalid envelope', issues: zres.error.issues }, 422); + }, +); + export { tripletRoutes }; diff --git a/src/types.ts b/src/types.ts index 3755e94..75634bd 100644 --- a/src/types.ts +++ b/src/types.ts @@ -17,6 +17,8 @@ export type Variables = { export type EntityType = 'person' | 'event' | 'product' | 'market' | 'org'; +export type TripletStatus = 'active' | 'deprecated'; + export type Triplet = { id: string; subject: string; @@ -28,6 +30,10 @@ export type Triplet = { bridge_score: number; // 跨越的 cluster 數量,Scout 發現指標 subject_entity_type: EntityType | null; // 主體 entity 類型(人格疊加局勢分析用) object_entity_type: EntityType | null; // 客體 entity 類型 + status: TripletStatus; // active(進圖遍歷)| deprecated(被取代,可查/可 rollback) + superseded_by: string | null; // 取代它的新 record id(active 時為 null) + source_uri: string | null; // ingest 來源穩定識別(idempotency 分組鍵) + content_hash: string | null; // 來源快照 hash(idempotency 比對鍵) created_at: number; updated_at: number; }; diff --git a/tests/mock-client.ts b/tests/mock-client.ts index f213954..c26d290 100644 --- a/tests/mock-client.ts +++ b/tests/mock-client.ts @@ -50,12 +50,19 @@ export class MockKbdbClient { } async ensureTemplate(name: string, slots: string[]): Promise { - if (!this.templates.has(name)) this.templates.set(name, slots); + // 對齊真 client 的 slot-diff 行為:既有 template 補缺 slot(不 early-return)。 + const have = this.templates.get(name); + if (!have) { + this.templates.set(name, [...slots]); + return; + } + const set = new Set(have); + for (const s of slots) if (!set.has(s)) have.push(s); } async createRecord(template: string, values: Record, owner_id?: string): Promise { const id = this.id('rec'); - this.records.set(id, { template, values, owner_id }); + this.records.set(id, { template, values: { ...values }, owner_id }); return id; } @@ -65,6 +72,11 @@ export class MockKbdbClient { return { record_id: recordId, template: r.template, values: r.values }; } + async updateRecord(recordId: string, values: Record): Promise { + const r = this.records.get(recordId); + if (r) Object.assign(r.values, values); + } + async listRecordsByTemplate(template: string, owner_id?: string): Promise { return [...this.records.entries()] .filter(([, r]) => r.template === template && (!owner_id || r.owner_id === owner_id)) diff --git a/tests/triplet-ingest.test.ts b/tests/triplet-ingest.test.ts new file mode 100644 index 0000000..33e0b79 --- /dev/null +++ b/tests/triplet-ingest.test.ts @@ -0,0 +1,102 @@ +// ingest 寫入端 — 走 mock KbdbClient(API-as-Wall),零 SQL、不打網路。 +// 覆蓋 T3.4 五案:正常 envelope / 同 hash no-op / 新 hash deprecate / 污染 envelope 422 / rollback。 +import { describe, it, expect } from 'vitest'; +import { ingestEnvelope, IngestEnvelopeSchema, type IngestEnvelope } from '../src/actions/triplet-ingest'; +import { queryTriplets } from '../src/actions/triplet-crud'; +import { mockClient } from './mock-client'; + +function envelope(hash: string, triplets: IngestEnvelope['triplets']): IngestEnvelope { + return { + source: { uri: 'github:uncle6me-web/wiki@a.md', content_hash: hash }, + extractor: { model: 'claude-sonnet-4-6', tier: 'deep' }, + triplets, + }; +} + +describe('ingestEnvelope — 正常 envelope', () => { + it('append 全部 triplet 為 active,記 source_uri/content_hash', async () => { + const c = mockClient(); + const res = await ingestEnvelope(c, envelope('h1', [ + { subject: 'A', predicate: 'rel', object: 'B' }, + { subject: 'B', predicate: 'rel', object: 'C' }, + ])); + expect(res).toEqual({ skipped: false, ingested: 2, deprecated: 0 }); + + const { triplets } = await queryTriplets(c, {}); + expect(triplets.length).toBe(2); + expect(triplets.every((t) => t.status === 'active')).toBe(true); + expect(triplets[0].source_uri).toBe('github:uncle6me-web/wiki@a.md'); + expect(triplets[0].content_hash).toBe('h1'); + }); +}); + +describe('ingestEnvelope — 同 hash no-op', () => { + it('同 uri+hash 再送 → skipped,不新增', async () => { + const c = mockClient(); + await ingestEnvelope(c, envelope('h1', [{ subject: 'A', predicate: 'r', object: 'B' }])); + const res = await ingestEnvelope(c, envelope('h1', [{ subject: 'A', predicate: 'r', object: 'B' }])); + expect(res.skipped).toBe(true); + + const { triplets } = await queryTriplets(c, {}); + expect(triplets.length).toBe(1); // 沒有重複 append + }); +}); + +describe('ingestEnvelope — 新 hash deprecate-then-append', () => { + it('同 uri 新 hash → 舊批轉 deprecated、新批 active;查詢 active-only', async () => { + const c = mockClient(); + await ingestEnvelope(c, envelope('h1', [{ subject: 'A', predicate: 'r', object: 'old' }])); + const res = await ingestEnvelope(c, envelope('h2', [{ subject: 'A', predicate: 'r', object: 'new' }])); + expect(res).toEqual({ skipped: false, ingested: 1, deprecated: 1 }); + + // active-only 查詢只見新批。 + const active = await queryTriplets(c, {}); + expect(active.triplets.length).toBe(1); + expect(active.triplets[0].object).toBe('new'); + + // 舊批仍在(deprecated),可考古/rollback。 + const all = await queryTriplets(c, { includeDeprecated: true }); + expect(all.triplets.length).toBe(2); + const deprecated = all.triplets.find((t) => t.status === 'deprecated'); + expect(deprecated?.object).toBe('old'); + expect(deprecated?.superseded_by).toBe('h2'); + }); +}); + +describe('ingestEnvelope — 污染 envelope 422(契約 strict)', () => { + it('triplet 帶 graph 領域欄位 bridge_score → schema 拒收', () => { + const polluted = { + source: { uri: 'u', content_hash: 'h' }, + extractor: { model: 'm', tier: 'deep' }, + triplets: [{ subject: 'A', predicate: 'r', object: 'B', bridge_score: 3 }], + }; + const parsed = IngestEnvelopeSchema.safeParse(polluted); + expect(parsed.success).toBe(false); + }); + + it('envelope 頂層帶禁止欄位 id → 拒收', () => { + const polluted = { + id: 'should-not-send', + source: { uri: 'u', content_hash: 'h' }, + extractor: { model: 'm', tier: 'deep' }, + triplets: [{ subject: 'A', predicate: 'r', object: 'B' }], + }; + expect(IngestEnvelopeSchema.safeParse(polluted).success).toBe(false); + }); +}); + +describe('ingestEnvelope — rollback(翻回 status)', () => { + it('把 deprecated 翻回 active 後,active 查詢重新見到它', async () => { + const c = mockClient(); + await ingestEnvelope(c, envelope('h1', [{ subject: 'A', predicate: 'r', object: 'old' }])); + await ingestEnvelope(c, envelope('h2', [{ subject: 'A', predicate: 'r', object: 'new' }])); + + // 取出被 deprecate 的舊批 id,手動 rollback(翻回 active、清 superseded_by)。 + const all = await queryTriplets(c, { includeDeprecated: true }); + const old = all.triplets.find((t) => t.status === 'deprecated')!; + await c.updateRecord(old.id, { status: 'active', superseded_by: '' }); + + const active = await queryTriplets(c, {}); + expect(active.triplets.map((t) => t.object).sort()).toEqual(['new', 'old']); + }); +});