From 3e0c85612df78f7dcb4c23fb3893e103ccd71c8f Mon Sep 17 00:00:00 2001 From: Githubiswhat <43241621+Githubiswhat@users.noreply.github.com> Date: Fri, 17 Oct 2025 15:27:18 +0800 Subject: [PATCH] feat(vectorDB): add support for Vastbase vector database --- packages/global/core/dataset/api.d.ts | 5 + packages/service/common/vectorDB/constants.ts | 1 + .../service/common/vectorDB/controller.ts | 9 +- packages/service/common/vectorDB/type.d.ts | 1 + .../common/vectorDB/vastbase/controller.ts | 223 ++++++++++++++++ .../service/common/vectorDB/vastbase/index.ts | 237 ++++++++++++++++++ packages/service/type/env.d.ts | 1 + projects/app/.env.template | 4 + 8 files changed, 480 insertions(+), 1 deletion(-) create mode 100644 packages/service/common/vectorDB/vastbase/controller.ts create mode 100644 packages/service/common/vectorDB/vastbase/index.ts diff --git a/packages/global/core/dataset/api.d.ts b/packages/global/core/dataset/api.d.ts index 1a3935127018..0c2f3c534aff 100644 --- a/packages/global/core/dataset/api.d.ts +++ b/packages/global/core/dataset/api.d.ts @@ -135,6 +135,11 @@ export type PgSearchRawType = { collection_id: string; score: number; }; +export type VastbaseSearchRawType = { + id: string; + collection_id: string; + score: number; +}; export type PushDatasetDataChunkProps = { q?: string; a?: string; diff --git a/packages/service/common/vectorDB/constants.ts b/packages/service/common/vectorDB/constants.ts index 8476cf6c1f66..773e63f7422c 100644 --- a/packages/service/common/vectorDB/constants.ts +++ b/packages/service/common/vectorDB/constants.ts @@ -3,5 +3,6 @@ export const DatasetVectorTableName = 'modeldata'; export const PG_ADDRESS = process.env.PG_URL; export const OCEANBASE_ADDRESS = process.env.OCEANBASE_URL; +export const VASTBASE_ADDRESS = process.env.VASTBASE_URL; export const MILVUS_ADDRESS = process.env.MILVUS_ADDRESS; export const MILVUS_TOKEN = process.env.MILVUS_TOKEN; diff --git a/packages/service/common/vectorDB/controller.ts b/packages/service/common/vectorDB/controller.ts index dc59726dcf5b..b3d4e1d0e2fb 100644 --- a/packages/service/common/vectorDB/controller.ts +++ b/packages/service/common/vectorDB/controller.ts @@ -1,11 +1,12 @@ /* vector crud */ import { PgVectorCtrl } from './pg'; import { ObVectorCtrl } from './oceanbase'; +import { VastbaseVectorCtrl } from './vastbase'; import { getVectorsByText } from '../../core/ai/embedding'; import type { EmbeddingRecallCtrlProps } from './controller.d'; import { type DelDatasetVectorCtrlProps, type InsertVectorProps } from './controller.d'; import { type EmbeddingModelItemType } from '@fastgpt/global/core/ai/model.d'; -import { MILVUS_ADDRESS, PG_ADDRESS, OCEANBASE_ADDRESS } from './constants'; +import { MILVUS_ADDRESS, PG_ADDRESS, OCEANBASE_ADDRESS, VASTBASE_ADDRESS } from './constants'; import { MilvusCtrl } from './milvus'; import { setRedisCache, @@ -18,7 +19,13 @@ import { import { throttle } from 'lodash'; import { retryFn } from '@fastgpt/global/common/system/utils'; +/** + * 获取向量数据库适配器实例 + * 根据环境变量动态选择数据库类型 + * 优先级:Vastbase > PostgreSQL > OceanBase > Milvus + */ const getVectorObj = () => { + if (VASTBASE_ADDRESS) return new VastbaseVectorCtrl(); if (PG_ADDRESS) return new PgVectorCtrl(); if (OCEANBASE_ADDRESS) return new ObVectorCtrl(); if (MILVUS_ADDRESS) return new MilvusCtrl(); diff --git a/packages/service/common/vectorDB/type.d.ts b/packages/service/common/vectorDB/type.d.ts index 739cd6a263fb..38c645eb34b8 100644 --- a/packages/service/common/vectorDB/type.d.ts +++ b/packages/service/common/vectorDB/type.d.ts @@ -6,6 +6,7 @@ declare global { var pgClient: Pool | null; var obClient: MysqlPool | null; var milvusClient: MilvusClient | null; + var vastbaseClient: Pool | null; } export type EmbeddingRecallItemType = { diff --git a/packages/service/common/vectorDB/vastbase/controller.ts b/packages/service/common/vectorDB/vastbase/controller.ts new file mode 100644 index 000000000000..a8b3f9a4386a --- /dev/null +++ b/packages/service/common/vectorDB/vastbase/controller.ts @@ -0,0 +1,223 @@ +import { delay } from '@fastgpt/global/common/system/utils'; +import { addLog } from '../../system/log'; +import { Pool } from 'pg'; +import type { QueryResultRow } from 'pg'; +import { VASTBASE_ADDRESS } from '../constants'; + +/** + * 连接 Vastbase 数据库 + * Vastbase 兼容 PostgreSQL 协议,因此可以使用 pg 驱动 + */ +export const connectVastbase = async (): Promise => { + if (global.vastbaseClient) { + return global.vastbaseClient; + } + + global.vastbaseClient = new Pool({ + connectionString: VASTBASE_ADDRESS, + max: Number(process.env.DB_MAX_LINK || 20), + min: 10, + keepAlive: true, + idleTimeoutMillis: 600000, + connectionTimeoutMillis: 20000, + query_timeout: 30000, + statement_timeout: 40000, + idle_in_transaction_session_timeout: 60000 + }); + + global.vastbaseClient.on('error', async (err) => { + addLog.error(`Vastbase error`, err); + global.vastbaseClient?.end(); + global.vastbaseClient = null; + + await delay(1000); + addLog.info(`Retry connect Vastbase`); + connectVastbase(); + }); + + try { + await global.vastbaseClient.connect(); + console.log('Vastbase connected'); + return global.vastbaseClient; + } catch (error) { + addLog.error(`Vastbase connect error`, error); + global.vastbaseClient?.end(); + global.vastbaseClient = null; + + await delay(1000); + addLog.info(`Retry connect Vastbase`); + + return connectVastbase(); + } +}; + +// 数据库操作相关类型定义 +type WhereProps = (string | [string, string | number])[]; +type GetProps = { + fields?: string[]; + where?: WhereProps; + order?: { field: string; mode: 'DESC' | 'ASC' | string }[]; + limit?: number; + offset?: number; +}; + +type DeleteProps = { + where: WhereProps; +}; + +type ValuesProps = { key: string; value?: string | number }[]; +type UpdateProps = { + values: ValuesProps; + where: WhereProps; +}; +type InsertProps = { + values: ValuesProps[]; +}; + +/** + * Vastbase 数据库操作类 + * 提供与 PostgreSQL 兼容的数据库操作接口 + */ +class VastbaseClass { + private getWhereStr(where?: WhereProps) { + return where + ? `WHERE ${where + .map((item) => { + if (typeof item === 'string') { + return item; + } + const val = typeof item[1] === 'number' ? item[1] : `'${String(item[1])}'`; + return `${item[0]}=${val}`; + }) + .join(' ')}` + : ''; + } + + private getUpdateValStr(values: ValuesProps) { + return values + .map((item) => { + const val = + typeof item.value === 'number' + ? item.value + : `'${String(item.value).replace(/\'/g, '"')}'`; + + return `${item.key}=${val}`; + }) + .join(','); + } + + private getInsertValStr(values: ValuesProps[]) { + return values + .map( + (items) => + `(${items + .map((item) => + typeof item.value === 'number' + ? item.value + : `'${String(item.value).replace(/\'/g, '"')}'` + ) + .join(',')})` + ) + .join(','); + } + + /** + * 查询数据 + */ + async select(table: string, props: GetProps) { + const sql = `SELECT ${ + !props.fields || props.fields?.length === 0 ? '*' : props.fields?.join(',') + } + FROM ${table} + ${this.getWhereStr(props.where)} + ${ + props.order + ? `ORDER BY ${props.order.map((item) => `${item.field} ${item.mode}`).join(',')}` + : '' + } + LIMIT ${props.limit || 10} OFFSET ${props.offset || 0} + `; + + const vastbase = await connectVastbase(); + return vastbase.query(sql); + } + + /** + * 统计数据数量 + */ + async count(table: string, props: GetProps) { + const sql = `SELECT COUNT(${props?.fields?.[0] || '*'}) + FROM ${table} + ${this.getWhereStr(props.where)} + `; + + const vastbase = await connectVastbase(); + return vastbase.query(sql).then((res) => Number(res.rows[0]?.count || 0)); + } + + /** + * 删除数据 + */ + async delete(table: string, props: DeleteProps) { + const sql = `DELETE FROM ${table} ${this.getWhereStr(props.where)}`; + const vastbase = await connectVastbase(); + return vastbase.query(sql); + } + + /** + * 更新数据 + */ + async update(table: string, props: UpdateProps) { + if (props.values.length === 0) { + return { + rowCount: 0 + }; + } + + const sql = `UPDATE ${table} SET ${this.getUpdateValStr(props.values)} ${this.getWhereStr( + props.where + )}`; + const vastbase = await connectVastbase(); + return vastbase.query(sql); + } + + /** + * 插入数据 + */ + async insert(table: string, props: InsertProps) { + if (props.values.length === 0) { + return { + rowCount: 0, + rows: [] + }; + } + + const fields = props.values[0].map((item) => item.key).join(','); + const sql = `INSERT INTO ${table} (${fields}) VALUES ${this.getInsertValStr( + props.values + )} RETURNING id`; + + const vastbase = await connectVastbase(); + return vastbase.query<{ id: string }>(sql); + } + + /** + * 执行原生 SQL 查询 + */ + async query(sql: string) { + const vastbase = await connectVastbase(); + const start = Date.now(); + return vastbase.query(sql).then((res) => { + const time = Date.now() - start; + + if (time > 300) { + addLog.warn(`Vastbase query time: ${time}ms, sql: ${sql}`); + } + + return res; + }); + } +} + +export const VastbaseClient = new VastbaseClass(); +export const Vastbase = global.vastbaseClient; diff --git a/packages/service/common/vectorDB/vastbase/index.ts b/packages/service/common/vectorDB/vastbase/index.ts new file mode 100644 index 000000000000..408b5cff46b5 --- /dev/null +++ b/packages/service/common/vectorDB/vastbase/index.ts @@ -0,0 +1,237 @@ +/* Vastbase vector crud */ +import { DatasetVectorTableName } from '../constants'; +import { delay, retryFn } from '@fastgpt/global/common/system/utils'; +import { VastbaseClient, connectVastbase } from './controller'; +import { type VastbaseSearchRawType } from '@fastgpt/global/core/dataset/api'; +import type { + DelDatasetVectorCtrlProps, + EmbeddingRecallCtrlProps, + EmbeddingRecallResponse, + InsertVectorControllerProps +} from '../controller.d'; +import dayjs from 'dayjs'; +import { addLog } from '../../system/log'; + +export class VastbaseVectorCtrl { + constructor() {} + init = async () => { + try { + await connectVastbase(); + await VastbaseClient.query(` + CREATE TABLE IF NOT EXISTS ${DatasetVectorTableName} ( + id BIGSERIAL PRIMARY KEY, + vector FLOATVECTOR(1536) NOT NULL, + team_id VARCHAR(50) NOT NULL, + dataset_id VARCHAR(50) NOT NULL, + collection_id VARCHAR(50) NOT NULL, + createtime TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ); + `); + + await VastbaseClient.query( + `CREATE INDEX CONCURRENTLY IF NOT EXISTS vector_index ON ${DatasetVectorTableName} USING hnsw (vector floatvector_ip_ops) WITH (m = 32, ef_construction = 128);` + ); + await VastbaseClient.query( + `CREATE INDEX CONCURRENTLY IF NOT EXISTS team_dataset_collection_index ON ${DatasetVectorTableName} USING btree(team_id, dataset_id, collection_id);` + ); + await VastbaseClient.query( + `CREATE INDEX CONCURRENTLY IF NOT EXISTS create_time_index ON ${DatasetVectorTableName} USING btree(createtime);` + ); + // 10w rows + // await VastbaseClient.query(` + // ALTER TABLE modeldata SET ( + // autovacuum_vacuum_scale_factor = 0.1, + // autovacuum_analyze_scale_factor = 0.05, + // autovacuum_vacuum_threshold = 50, + // autovacuum_analyze_threshold = 50, + // autovacuum_vacuum_cost_delay = 20, + // autovacuum_vacuum_cost_limit = 200 + // );`); + + // 100w rows + // await VastbaseClient.query(` + // ALTER TABLE modeldata SET ( + // autovacuum_vacuum_scale_factor = 0.01, + // autovacuum_analyze_scale_factor = 0.02, + // autovacuum_vacuum_threshold = 1000, + // autovacuum_analyze_threshold = 1000, + // autovacuum_vacuum_cost_delay = 10, + // autovacuum_vacuum_cost_limit = 2000 + // );`) + + addLog.info('init Vastbase successful'); + } catch (error) { + addLog.error('init Vastbase error', error); + } + }; + insert = async (props: InsertVectorControllerProps): Promise<{ insertIds: string[] }> => { + const { teamId, datasetId, collectionId, vectors } = props; + + const values = vectors.map((vector) => [ + { key: 'vector', value: `[${vector}]` }, + { key: 'team_id', value: String(teamId) }, + { key: 'dataset_id', value: String(datasetId) }, + { key: 'collection_id', value: String(collectionId) } + ]); + + const { rowCount, rows } = await VastbaseClient.insert(DatasetVectorTableName, { + values + }); + + if (rowCount === 0) { + return Promise.reject('insertDatasetData: no insert'); + } + + return { + insertIds: rows.map((row) => row.id) + }; + }; + delete = async (props: DelDatasetVectorCtrlProps): Promise => { + const { teamId } = props; + + const teamIdWhere = `team_id='${String(teamId)}' AND`; + + const where = await (() => { + if ('id' in props && props.id) return `${teamIdWhere} id=${props.id}`; + + if ('datasetIds' in props && props.datasetIds) { + const datasetIdWhere = `dataset_id IN (${props.datasetIds + .map((id) => `'${String(id)}'`) + .join(',')})`; + + if ('collectionIds' in props && props.collectionIds) { + return `${teamIdWhere} ${datasetIdWhere} AND collection_id IN (${props.collectionIds + .map((id) => `'${String(id)}'`) + .join(',')})`; + } + + return `${teamIdWhere} ${datasetIdWhere}`; + } + + if ('idList' in props && Array.isArray(props.idList)) { + if (props.idList.length === 0) return; + return `${teamIdWhere} id IN (${props.idList.map((id) => String(id)).join(',')})`; + } + return Promise.reject('deleteDatasetData: no where'); + })(); + + if (!where) return; + + await VastbaseClient.delete(DatasetVectorTableName, { + where: [where] + }); + }; + embRecall = async (props: EmbeddingRecallCtrlProps): Promise => { + const { teamId, datasetIds, vector, limit, forbidCollectionIdList, filterCollectionIdList } = + props; + + // Get forbid collection + const formatForbidCollectionIdList = (() => { + if (!filterCollectionIdList) return forbidCollectionIdList; + const list = forbidCollectionIdList + .map((id) => String(id)) + .filter((id) => !filterCollectionIdList.includes(id)); + return list; + })(); + const forbidCollectionSql = + formatForbidCollectionIdList.length > 0 + ? `AND collection_id NOT IN (${formatForbidCollectionIdList.map((id) => `'${id}'`).join(',')})` + : ''; + + // Filter by collectionId + const formatFilterCollectionId = (() => { + if (!filterCollectionIdList) return; + + return filterCollectionIdList + .map((id) => String(id)) + .filter((id) => !forbidCollectionIdList.includes(id)); + })(); + const filterCollectionIdSql = formatFilterCollectionId + ? `AND collection_id IN (${formatFilterCollectionId.map((id) => `'${id}'`).join(',')})` + : ''; + // Empty data + if (formatFilterCollectionId && formatFilterCollectionId.length === 0) { + return { results: [] }; + } + + const results: any = await VastbaseClient.query( + `BEGIN; + SET LOCAL hnsw.ef_search = ${global.systemEnv?.hnswEfSearch || 100}; + SET LOCAL hnsw.max_scan_tuples = ${global.systemEnv?.hnswMaxScanTuples || 100000}; + SET LOCAL hnsw.iterative_scan = relaxed_order; + WITH relaxed_results AS MATERIALIZED ( + select id, collection_id, vector <#> '[${vector}]' AS score + from ${DatasetVectorTableName} + where dataset_id IN (${datasetIds.map((id) => `'${String(id)}'`).join(',')}) + ${filterCollectionIdSql} + ${forbidCollectionSql} + order by score limit ${limit} + ) SELECT id, collection_id, score FROM relaxed_results ORDER BY score; + COMMIT;` + ); + const rows = results?.[results.length - 2]?.rows as VastbaseSearchRawType[]; + + if (!Array.isArray(rows)) { + return { + results: [] + }; + } + + return { + results: rows.map((item) => ({ + id: String(item.id), + collectionId: item.collection_id, + score: item.score * -1 + })) + }; + }; + getVectorDataByTime = async (start: Date, end: Date) => { + const { rows } = await VastbaseClient.query<{ + id: string; + team_id: string; + dataset_id: string; + }>(`SELECT id, team_id, dataset_id + FROM ${DatasetVectorTableName} + WHERE createtime BETWEEN '${dayjs(start).format('YYYY-MM-DD HH:mm:ss')}' AND '${dayjs( + end + ).format('YYYY-MM-DD HH:mm:ss')}'; + `); + + return rows.map((item) => ({ + id: String(item.id), + teamId: item.team_id, + datasetId: item.dataset_id + })); + }; + getVectorCountByTeamId = async (teamId: string) => { + const total = await VastbaseClient.count(DatasetVectorTableName, { + where: [['team_id', String(teamId)]] + }); + + return total; + }; + getVectorCountByDatasetId = async (teamId: string, datasetId: string) => { + const total = await VastbaseClient.count(DatasetVectorTableName, { + where: [['team_id', String(teamId)], 'and', ['dataset_id', String(datasetId)]] + }); + + return total; + }; + getVectorCountByCollectionId = async ( + teamId: string, + datasetId: string, + collectionId: string + ) => { + const total = await VastbaseClient.count(DatasetVectorTableName, { + where: [ + ['team_id', String(teamId)], + 'and', + ['dataset_id', String(datasetId)], + 'and', + ['collection_id', String(collectionId)] + ] + }); + + return total; + }; +} diff --git a/packages/service/type/env.d.ts b/packages/service/type/env.d.ts index aff70d4eea88..515171b1daff 100644 --- a/packages/service/type/env.d.ts +++ b/packages/service/type/env.d.ts @@ -15,6 +15,7 @@ declare global { MONGODB_LOG_URI?: string; PG_URL: string; OCEANBASE_URL: string; + VASTBASE_URL: string; MILVUS_ADDRESS: string; MILVUS_TOKEN: string; SANDBOX_URL: string; diff --git a/projects/app/.env.template b/projects/app/.env.template index fd51a63b3852..8e52822d265a 100644 --- a/projects/app/.env.template +++ b/projects/app/.env.template @@ -57,6 +57,10 @@ PG_URL=postgresql://username:password@localhost:5432/postgres # MILVUS_ADDRESS= # MILVUS_TOKEN= +# Vastbase 数据库连接(国产数据库,兼容 PostgreSQL) +# 取消注释以下行来使用 Vastbase +# VASTBASE_URL=postgresql://username:password@localhost:5432/fastgpt +VASTBASE_URL= # 页面的地址,用于自动补全相对路径资源的 domain,注意后面不要跟 / FE_DOMAIN=http://localhost:3000