随着基础模型的兴起,向量数据库的受欢迎程度也飙升。事实上,在大型语言模型环境中,向量数据库也很有用。,pgvector 是一个基于 PostgreSQL 的扩展,为用户提供了一套强大的功能,用于高效地存储、查询和处理向量数据。
插件编写
要在数据库中运行CREATE EXTENSION命令最少需要两个文件:
-
一个格式化的控制文件extension_name.control,用于告诉Postgresql关于你的扩展的一些基础信息;
-
一个是格式化的扩展SQL脚本extension--version.sql。
control 文件
pgvector的control文件
comment = 'vector data type and ivfflat access method'
default_version = '0.4.2'
module_pathname = '$libdir/vector'
relocatable = true
comment:关于插件的描述,仅在安装时应用。
default_version:当没有指定版本时,默认安装的版本。
module_pathname:脚本文件中每次出现的module_pathname,都会被替换为该值。如果没有设置,则不进行 替换。通常情况下,这个参数都会被设为 $libdir/shared_library_name,那么使用create function命令 创建使用C语言编写的函数时就可以直接使用module_pathname,而不需要硬连接共享库的名称。
relocatable:一个布尔类型的标志,标志该插件是否是可重定位的
SQL与C语言
在插件中定义类型
sql中定义
CREATE TYPE vector;#类似占位符类型,C语言引用然后用定义完整的取代
CREATE FUNCTION vector_in(cstring, oid, integer) RETURNS vector
AS 'MODULE_PATHNAME' LANGUAGE C IMMUTABLE STRICT PARALLEL SAFE;
#第一个参数是 以 C 字符串存在的输入文本,第二个参数是该类型自身的 OID(对于数组类型则是其元素类
型的 OID),第三个参数是目标列的 typmod(如果知道,不知道则将传递 -1)
CREATE FUNCTION vector_out(vector) RETURNS cstring
AS 'MODULE_PATHNAME' LANGUAGE C IMMUTABLE STRICT PARALLEL SAFE;
CREATE FUNCTION vector_typmod_in(cstring[]) RETURNS integer
AS 'MODULE_PATHNAME' LANGUAGE C IMMUTABLE STRICT PARALLEL SAFE;
CREATE FUNCTION vector_recv(internal, oid, integer) RETURNS vector
AS 'MODULE_PATHNAME' LANGUAGE C IMMUTABLE STRICT PARALLEL SAFE;
CREATE FUNCTION vector_send(vector) RETURNS bytea
AS 'MODULE_PATHNAME' LANGUAGE C IMMUTABLE STRICT PARALLEL SAFE;
CREATE TYPE vector (
INPUT = vector_in, #外部文本表达转换成为该类型定义的操作符和函数所使用的函数
OUTPUT = vector_out, #执行反向的转换
TYPMOD_IN = vector_typmod_in, #限制长度
RECEIVE = vector_recv, #把类型的外部二进制表达转换成内部表达
SEND = vector_send, #执行反向操作
STORAGE = extended #EXTERNAL —— 允许行外存储,但不许压缩
);
c语言具体实现
vector.c
#include "postgres.h"包括与Postgres接口所需的大部分基本内容这行必须包含在声明Postgres函数的每 个C文件中。 #include "fmgr.h"需要包含以使用PG_GETARG_XXX和PG_RETURN_XXX宏。
vector结构体
typedef struct Vector
{
int32 vl_len_; /* varlena header (do not touch directly!) */
int16 dim; /* number of dimensions */
int16 unused;
float x[FLEXIBLE_ARRAY_MEMBER];
} Vector; #以浮点方式保存
vector_in
pt = strtok(str, ",");
stringEnd = pt;
while (pt != NULL && *stringEnd != ']')
{
... #检查维度是否符合规范
x[dim] = strtof(pt, &stringEnd);
... #检查输入语法是否正确
pt = strtok(NULL, ",");
...
}
result = InitVector(dim);
for (i = 0 ; i < dim; i++)
result->x[i] = x[i];
PG_RETURN_POINTER(result);
vecter_out
buf = (char *) palloc(FLOAT_SHORTEST_DECIMAL_LEN * dim + 2 );
ptr = buf;
*ptr = '[';
ptr++;
for (i = 0 ; i < dim; i++)
{
if (i > 0 )
{
*ptr = ',';
ptr++;
}
#if PG_VERSION_NUM >= 120000
n = float_to_shortest_decimal_bufn(vector->x[i], ptr);
#else
n = sprintf(ptr, "%.*g", ndig, vector->x[i]);
#endif
ptr += n;
PG_FREE_IF_COPY(vector, 0 );#释放内存
PG_RETURN_CSTRING(buf);#返回字符串
vector_typmod_in
vector_typmod_in(PG_FUNCTION_ARGS)
{
ArrayType *ta = PG_GETARG_ARRAYTYPE_P( 0 );
int32 *tl;
int n;
tl = ArrayGetIntegerTypmods(ta, &n);
....#判断维度是否符合规范
PG_RETURN_INT32(*tl);
}
vecter_recv
vector_recv(PG_FUNCTION_ARGS)
{
....
dim = pq_getmsgint(buf, sizeof(int16)); 读取来自服务器的整数信息
unused = pq_getmsgint(buf, sizeof(int16));
..../*检查是否符合规范*/
result = InitVector(dim);
for (i = 0 ; i < dim; i++)
result->x[i] = pq_getmsgfloat4(buf); 读取来自服务器的浮点信息
PG_RETURN_POINTER(result);
}
vecter_send
vector_send(PG_FUNCTION_ARGS)
{
Vector *vec = PG_GETARG_VECTOR_P( 0 );
StringInfoData buf;
int i;
pq_begintypsend(&buf); /*初始化buf*/
pq_sendint(&buf, vec->dim, sizeof(int16)); /*发送整数*/
pq_sendint(&buf, vec->unused, sizeof(int16));
for (i = 0 ; i < vec->dim; i++)
pq_sendfloat4(&buf, vec->x[i]); /*发送浮点数*/
PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
}
在插件中实现功能
sql定义
CREATE FUNCTION l2_distance(vector, vector) RETURNS float8 #计算欧式距离
AS 'MODULE_PATHNAME' LANGUAGE C IMMUTABLE STRICT PARALLEL SAFE; #该函数对于在并行模
式中运行是安全的并且不受限制
CREATE FUNCTION inner_product(vector, vector) RETURNS float8 #内积
AS 'MODULE_PATHNAME' LANGUAGE C IMMUTABLE STRICT PARALLEL SAFE;
CREATE FUNCTION cosine_distance(vector, vector) RETURNS float8 #余弦相似度
AS 'MODULE_PATHNAME' LANGUAGE C IMMUTABLE STRICT PARALLEL SAFE;
CREATE FUNCTION vector_dims(vector) RETURNS integer #维度
AS 'MODULE_PATHNAME' LANGUAGE C IMMUTABLE STRICT PARALLEL SAFE;
CREATE FUNCTION vector_norm(vector) RETURNS float8 #归一化
AS 'MODULE_PATHNAME' LANGUAGE C IMMUTABLE STRICT PARALLEL SAFE;
CREATE FUNCTION vector_add(vector, vector) RETURNS vector #向量相加
AS 'MODULE_PATHNAME' LANGUAGE C IMMUTABLE STRICT PARALLEL SAFE;
CREATE FUNCTION vector_sub(vector, vector) RETURNS vector #向量相减
AS 'MODULE_PATHNAME' LANGUAGE C IMMUTABLE STRICT PARALLEL SAFE;
c语言
计算欧式距离
for (int i = 0 ; i < a->dim; i++)
{
diff = ax[i] - bx[i];
distance += diff * diff;
}
计算内积
for (int i = 0; i < a->dim; i++)
distance += ax[i] * bx[i];
PG_RETURN_FLOAT8(distance);
计算余弦相似度
for (int i = 0; i < a->dim; i++)
{
distance += ax[i] * bx[i];
norma += ax[i] * ax[i];
normb += bx[i] * bx[i];
}
/* Use sqrt(a * b) over sqrt(a) * sqrt(b) */
PG_RETURN_FLOAT8(1 - (distance / sqrt(norma * normb)))
在插件中定义运算符号
c语言
CREATE OPERATOR <-> (
LEFTARG = vector, RIGHTARG = vector, PROCEDURE = l2_distance,
COMMUTATOR = '<->'
);
/*每一个运算符都有一个左参数(LEFTARG),一个右参数(RIGHTARG)和 一个函数(PROCEDURE)*/
CREATE OPERATOR <#> (
LEFTARG = vector, RIGHTARG = vector, PROCEDURE = vector_negative_inner_product,
COMMUTATOR = '<#>'
);
CREATE OPERATOR <=> (
LEFTARG = vector, RIGHTARG = vector, PROCEDURE = cosine_distance,
COMMUTATOR = '<=>'
);
CREATE OPERATOR + (
LEFTARG = vector, RIGHTARG = vector, PROCEDURE = vector_add,
COMMUTATOR = +
);
CREATE OPERATOR - (
LEFTARG = vector, RIGHTARG = vector, PROCEDURE = vector_sub,
COMMUTATOR = -
);
CREATE OPERATOR < (
LEFTARG = vector, RIGHTARG = vector, PROCEDURE = vector_lt,
COMMUTATOR = > , NEGATOR = >= ,
RESTRICT = scalarltsel, JOIN = scalarltjoinsel
);
commutatorL:指明x op1 y等效于y op2 x
NEGATOR:指x op1 y 等价于 not(y op2 x)
-- should use scalarlesel and scalarlejoinsel, but not supported in Postgres < 11
CREATE OPERATOR <= (
LEFTARG = vector, RIGHTARG = vector, PROCEDURE = vector_le,
COMMUTATOR = >= , NEGATOR = > ,
RESTRICT = scalarltsel, JOIN = scalarltjoinsel
);
CREATE OPERATOR = (
LEFTARG = vector, RIGHTARG = vector, PROCEDURE = vector_eq,
COMMUTATOR = = , NEGATOR = <> ,
RESTRICT = eqsel, JOIN = eqjoinsel
);
CREATE OPERATOR <> (
LEFTARG = vector, RIGHTARG = vector, PROCEDURE = vector_ne,
COMMUTATOR = <> , NEGATOR = = ,
RESTRICT = eqsel, JOIN = eqjoinsel
);
-- should use scalargesel and scalargejoinsel, but not supported in Postgres < 11
CREATE OPERATOR >= (
LEFTARG = vector, RIGHTARG = vector, PROCEDURE = vector_ge,
COMMUTATOR = <= , NEGATOR = < ,
RESTRICT = scalargtsel, JOIN = scalargtjoinsel
);
CREATE OPERATOR > (
LEFTARG = vector, RIGHTARG = vector, PROCEDURE = vector_gt,
COMMUTATOR = < , NEGATOR = <= ,
RESTRICT = scalargtsel, JOIN = scalargtjoinsel
);
在插件中实现索引
IVF(Inverted File,倒排文件)是一种基于量化的索引类型。它通过聚类方法把空间里的点划分成 nlist个单元。查询时先把目标向量与所有单元的中心做距离比较,选出 nprobe 个最近单元。然后比较这些被选中单元里的所有向量,得到最终的结果。
-
nlist的取值:rows/1000 sqrt(rows)
-
probe的取值:nlist/10 sqt(nlist)
执行过程
-
initializing
-
performing k-means
-
sorting tuples
-
loading tuples
构建索引:ivfbuild.c
static void
BuildIndex(Relation heap, Relation index, IndexInfo *indexInfo,
IvfflatBuildState * buildstate, ForkNumber forkNum)
{
InitBuildState(buildstate, heap, index, indexInfo); /*设置必要的缓冲区和结构*/
ComputeCenters(buildstate);/*计算聚类中心和协方差矩阵等统计信息*/
/* Create pages */
CreateMetaPage(index, buildstate->dimensions, buildstate->lists, forkNum);
CreateListPages(index, buildstate->centers, buildstate->dimensions, buildstate-
>lists, forkNum, &buildstate->listInfo);
IvfflatBench("CreateEntryPages", CreateEntryPages(buildstate, forkNum));
FreeBuildState(buildstate);
}
索引扫描:ivafscan.c
GetScanLists(IndexScanDesc scan, Datum value)
{
Buffer cbuf;
Page cpage;
IvfflatList list;
OffsetNumber offno;
OffsetNumber maxoffno;
BlockNumber nextblkno = IVFFLAT_HEAD_BLKNO;
int listCount = 0;
IvfflatScanOpaque so = (IvfflatScanOpaque) scan->opaque;
double distance;
IvfflatScanList *scanlist;
double maxDistance = DBL_MAX;
/* Search all list pages */
while (BlockNumberIsValid(nextblkno))
{
cbuf = ReadBuffer(scan->indexRelation, nextblkno);
LockBuffer(cbuf, BUFFER_LOCK_SHARE);
cpage = BufferGetPage(cbuf);
maxoffno = PageGetMaxOffsetNumber(cpage);
for (offno = FirstOffsetNumber; offno <= maxoffno; offno =
OffsetNumberNext(offno))
{
list = (IvfflatList) PageGetItem(cpage, PageGetItemId(cpage, offno));
/* Use procinfo from the index instead of scan key for performance */
distance = DatumGetFloat8(FunctionCall2Coll(so->procinfo, so->collation,
PointerGetDatum(&list->center), value));
if (listCount < so->probes)
{
scanlist = &so->lists[listCount];
scanlist->startPage = list->startPage;
scanlist->distance = distance;
listCount++;
/* Add to heap */
pairingheap_add(so->listQueue, &scanlist->ph_node);
/* Calculate max distance */
if (listCount == so->probes)
maxDistance = ((IvfflatScanList *) pairingheap_first(so-
>listQueue))->distance;
}
else if (distance < maxDistance)
{
/* Remove */
scanlist = (IvfflatScanList *) pairingheap_remove_first(so-
>listQueue);
/* Reuse */
scanlist->startPage = list->startPage;
scanlist->distance = distance;
pairingheap_add(so->listQueue, &scanlist->ph_node);
/* Update max distance */
maxDistance = ((IvfflatScanList *) pairingheap_first(so-
>listQueue))->distance;
}
}
nextblkno = IvfflatPageGetOpaque(cpage)->nextblkno;
UnlockReleaseBuffer(cbuf);
}
}
加入索引的查询效果:
insert into test(vec) SELECT ARRAY[trunc(random()*1000), trunc(random()*1000),
trunc(random()*1000)] FROM generate_series(1, 1000000);
CREATE INDEX ON test USING ivfflat (vec vector_l2_ops) WITH (lists = 1000)
当为一个大型表创建索引时,如果 maintenance_work_mem 的值不足够大,那么 PostgreSQL 将不得不将部分数据存储在磁盘上。这样会导致磁盘 I/O 操作增多,影响索引创建的速度和性能。
如果系统中有其他进程也在使用大量内存,那么可能会导致系统出现内存不足的情况,从而影响索引创建的成功率和系统的稳定性。
因此,为了确保索引创建过程中有足够的内存可用,我们需要适当地增大 maintenance_work_mem 的值。
通常,应该根据系统中的内存总量、可用内存、负载情况和表的大小等因素进行调整,以便为 CREATE INDEX 命令提供足够的内存空间。
版本控制与插件更新
makefile配置
MODULE_big = vector
DATA = $(wildcard sql/*--*.sql)
OBJS = src/ivfbuild.o src/ivfflat.o src/ivfinsert.o src/ivfkmeans.o src/ivfscan.o
src/ivfutils.o src/ivfvacuum.o src/vector.o
TESTS = $(wildcard test/sql/*.sql)
PG_CONFIG ?= pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
include $(PGXS)
对应的updata语句
ALTER EXTENSION vector UPDATE TO ...
updata 语句需要对应的extension--oldversion--newversion.sql形式的更新脚本
测试
make install之后通过make installcheck调用。对于每个测试文件,在名为expected/的子目录中也应该有一个对应包含预期输出的文件,该文件具有与测试脚本相同的名称,只不过后缀是.out.任何差异都将写入文件regression.diffs.