Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions packages/global/core/dataset/api.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ export type PgSearchRawType = {
collection_id: string;
score: number;
};
export type VastbaseSearchRawType = {
id: string;
collection_id: string;
score: number;
};
export type PushDatasetDataChunkProps = {
q?: string;
a?: string;
Expand Down
1 change: 1 addition & 0 deletions packages/service/common/vectorDB/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ export const DatasetVectorTableName = 'modeldata';

export const PG_ADDRESS = process.env.PG_URL;
export const OCEANBASE_ADDRESS = process.env.OCEANBASE_URL;
export const VASTBASE_ADDRESS = process.env.VASTBASE_URL;
export const MILVUS_ADDRESS = process.env.MILVUS_ADDRESS;
export const MILVUS_TOKEN = process.env.MILVUS_TOKEN;
9 changes: 8 additions & 1 deletion packages/service/common/vectorDB/controller.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
/* vector crud */
import { PgVectorCtrl } from './pg';
import { ObVectorCtrl } from './oceanbase';
import { VastbaseVectorCtrl } from './vastbase';
import { getVectorsByText } from '../../core/ai/embedding';
import type { EmbeddingRecallCtrlProps } from './controller.d';
import { type DelDatasetVectorCtrlProps, type InsertVectorProps } from './controller.d';
import { type EmbeddingModelItemType } from '@fastgpt/global/core/ai/model.d';
import { MILVUS_ADDRESS, PG_ADDRESS, OCEANBASE_ADDRESS } from './constants';
import { MILVUS_ADDRESS, PG_ADDRESS, OCEANBASE_ADDRESS, VASTBASE_ADDRESS } from './constants';
import { MilvusCtrl } from './milvus';
import {
setRedisCache,
Expand All @@ -18,7 +19,13 @@ import {
import { throttle } from 'lodash';
import { retryFn } from '@fastgpt/global/common/system/utils';

/**
* 获取向量数据库适配器实例
* 根据环境变量动态选择数据库类型
* 优先级:Vastbase > PostgreSQL > OceanBase > Milvus
*/
const getVectorObj = () => {
if (VASTBASE_ADDRESS) return new VastbaseVectorCtrl();
if (PG_ADDRESS) return new PgVectorCtrl();
if (OCEANBASE_ADDRESS) return new ObVectorCtrl();
if (MILVUS_ADDRESS) return new MilvusCtrl();
Expand Down
1 change: 1 addition & 0 deletions packages/service/common/vectorDB/type.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ declare global {
var pgClient: Pool | null;
var obClient: MysqlPool | null;
var milvusClient: MilvusClient | null;
var vastbaseClient: Pool | null;
}

export type EmbeddingRecallItemType = {
Expand Down
223 changes: 223 additions & 0 deletions packages/service/common/vectorDB/vastbase/controller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
import { delay } from '@fastgpt/global/common/system/utils';
import { addLog } from '../../system/log';
import { Pool } from 'pg';
import type { QueryResultRow } from 'pg';
import { VASTBASE_ADDRESS } from '../constants';

/**
* 连接 Vastbase 数据库
* Vastbase 兼容 PostgreSQL 协议,因此可以使用 pg 驱动
*/
export const connectVastbase = async (): Promise<Pool> => {
if (global.vastbaseClient) {
return global.vastbaseClient;
}

global.vastbaseClient = new Pool({
connectionString: VASTBASE_ADDRESS,
max: Number(process.env.DB_MAX_LINK || 20),
min: 10,
keepAlive: true,
idleTimeoutMillis: 600000,
connectionTimeoutMillis: 20000,
query_timeout: 30000,
statement_timeout: 40000,
idle_in_transaction_session_timeout: 60000
});

global.vastbaseClient.on('error', async (err) => {
addLog.error(`Vastbase error`, err);
global.vastbaseClient?.end();
global.vastbaseClient = null;

await delay(1000);
addLog.info(`Retry connect Vastbase`);
connectVastbase();
});

try {
await global.vastbaseClient.connect();
console.log('Vastbase connected');
return global.vastbaseClient;
} catch (error) {
addLog.error(`Vastbase connect error`, error);
global.vastbaseClient?.end();
global.vastbaseClient = null;

await delay(1000);
addLog.info(`Retry connect Vastbase`);

return connectVastbase();
}
};

// 数据库操作相关类型定义
type WhereProps = (string | [string, string | number])[];
type GetProps = {
fields?: string[];
where?: WhereProps;
order?: { field: string; mode: 'DESC' | 'ASC' | string }[];
limit?: number;
offset?: number;
};

type DeleteProps = {
where: WhereProps;
};

type ValuesProps = { key: string; value?: string | number }[];
type UpdateProps = {
values: ValuesProps;
where: WhereProps;
};
type InsertProps = {
values: ValuesProps[];
};

/**
* Vastbase 数据库操作类
* 提供与 PostgreSQL 兼容的数据库操作接口
*/
class VastbaseClass {
private getWhereStr(where?: WhereProps) {
return where
? `WHERE ${where
.map((item) => {
if (typeof item === 'string') {
return item;
}
const val = typeof item[1] === 'number' ? item[1] : `'${String(item[1])}'`;
return `${item[0]}=${val}`;
})
.join(' ')}`
: '';
}

private getUpdateValStr(values: ValuesProps) {
return values
.map((item) => {
const val =
typeof item.value === 'number'
? item.value
: `'${String(item.value).replace(/\'/g, '"')}'`;

return `${item.key}=${val}`;
})
.join(',');
}

private getInsertValStr(values: ValuesProps[]) {
return values
.map(
(items) =>
`(${items
.map((item) =>
typeof item.value === 'number'
? item.value
: `'${String(item.value).replace(/\'/g, '"')}'`
)
.join(',')})`
)
.join(',');
}

/**
* 查询数据
*/
async select<T extends QueryResultRow = any>(table: string, props: GetProps) {
const sql = `SELECT ${
!props.fields || props.fields?.length === 0 ? '*' : props.fields?.join(',')
}
FROM ${table}
${this.getWhereStr(props.where)}
${
props.order
? `ORDER BY ${props.order.map((item) => `${item.field} ${item.mode}`).join(',')}`
: ''
}
LIMIT ${props.limit || 10} OFFSET ${props.offset || 0}
`;

const vastbase = await connectVastbase();
return vastbase.query<T>(sql);
}

/**
* 统计数据数量
*/
async count(table: string, props: GetProps) {
const sql = `SELECT COUNT(${props?.fields?.[0] || '*'})
FROM ${table}
${this.getWhereStr(props.where)}
`;

const vastbase = await connectVastbase();
return vastbase.query(sql).then((res) => Number(res.rows[0]?.count || 0));
}

/**
* 删除数据
*/
async delete(table: string, props: DeleteProps) {
const sql = `DELETE FROM ${table} ${this.getWhereStr(props.where)}`;
const vastbase = await connectVastbase();
return vastbase.query(sql);
}

/**
* 更新数据
*/
async update(table: string, props: UpdateProps) {
if (props.values.length === 0) {
return {
rowCount: 0
};
}

const sql = `UPDATE ${table} SET ${this.getUpdateValStr(props.values)} ${this.getWhereStr(
props.where
)}`;
const vastbase = await connectVastbase();
return vastbase.query(sql);
}

/**
* 插入数据
*/
async insert(table: string, props: InsertProps) {
if (props.values.length === 0) {
return {
rowCount: 0,
rows: []
};
}

const fields = props.values[0].map((item) => item.key).join(',');
const sql = `INSERT INTO ${table} (${fields}) VALUES ${this.getInsertValStr(
props.values
)} RETURNING id`;

const vastbase = await connectVastbase();
return vastbase.query<{ id: string }>(sql);
}

/**
* 执行原生 SQL 查询
*/
async query<T extends QueryResultRow = any>(sql: string) {
const vastbase = await connectVastbase();
const start = Date.now();
return vastbase.query<T>(sql).then((res) => {
const time = Date.now() - start;

if (time > 300) {
addLog.warn(`Vastbase query time: ${time}ms, sql: ${sql}`);
}

return res;
});
}
}

export const VastbaseClient = new VastbaseClass();
export const Vastbase = global.vastbaseClient;
Loading