pgvector
本文档介绍了存储、查询向量数据的插件工具 pgvector。
1 描述
- 功能
- 存储你的向量数据;
- 可进行精确/模糊的相似最近邻搜索;
- 支持 L2 距离,内积,余弦距离计算;
- 支持 IVFFlat、HNSW 索引;
- 支持在 Postgres 客户端中使用任何编程语言运行。
2 快速上手
安装扩展。每个数据库只需创建一次。
=# CREATE EXTENSION vector;
创建一个三维的向量列。
=# CREATE TABLE items (id bigserial PRIMARY KEY, embedding vector(3));
插入向量数据。
=# INSERT INTO items (embedding) VALUES ('[1,2,3]'), ('[4,5,6]');
通过 L2 距离获取最近邻。
=# SELECT * FROM items ORDER BY embedding <-> '[3,1,2]' LIMIT 5;
也支持内积(<#>
)和余弦距离(<=>
)。
注意!
<#>
返回负内积,因为 YMatrix 仅支持对运算符进行顺序索引扫描。
3 完整功能
3.1 存储
此部分展示了 DDL/DML 语句的使用。
创建一个含向量列的新表。
=# CREATE TABLE items (id bigserial PRIMARY KEY, embedding vector(3));
或向已存在的表中添加一个向量列。
=# ALTER TABLE items ADD COLUMN embedding vector(3);
插入向量数据。
=# INSERT INTO items (embedding) VALUES ('[1,2,3]'), ('[4,5,6]');
Upsert 向量数据。
=# INSERT INTO items (id, embedding) VALUES (1, '[1,2,3]'), (2, '[4,5,6]')
ON CONFLICT (id) DO UPDATE SET embedding = EXCLUDED.embedding;
更新向量数据。
=# UPDATE items SET embedding = '[1,2,3]' WHERE id = 1;
删除向量数据。
=# DELETE FROM items WHERE id = 1;
3.2 查询
此部分展示了查询 SQL 语句的使用。
获取一个向量数据的最近邻。
=# SELECT * FROM items ORDER BY embedding <-> '[3,1,2]' LIMIT 5;
获取一行的最近邻。
=# SELECT * FROM items WHERE id != 1 ORDER BY embedding <-> (SELECT embedding FROM items WHERE id = 1) LIMIT 5;
获取包含特定距离的行。
=# SELECT * FROM items WHERE embedding <-> '[3,1,2]' < 5;
注意!
查询中同时包含 ORDER BY
和 LIMIT
子句以使用索引。
3.2.1 距离
获取距离。
=# SELECT embedding <-> '[3,1,2]' AS distance FROM items;
对于内积,则需乘以 -1
(因为 <#>
返回负内积)。
=# SELECT (embedding <#> '[3,1,2]') * -1 AS inner_product FROM items;
对于余弦相似度,则需 1 - 余弦距离
。
=# SELECT 1 - (embedding <=> '[3,1,2]') AS cosine_similarity FROM items;
3.2.2 聚集
平均向量。
=# SELECT AVG(embedding) FROM items;
平均向量组。
=# SELECT id, AVG(embedding) FROM items GROUP BY id;
3.3 索引
默认情况下,pgvector 会执行精确的最近邻搜索,从而提供了完美的召回率。
你可以添加一个索引,以使用近似最近邻搜索,这会牺牲一些召回率来换取速度。与传统的索引不同,在添加一个模糊索引后,你将会看到查询返回的多个不同结果。
支持的索引类型如下:
- IVFFlat
- HNSW
3.3.1 IVFFlat
IVFFlat 索引属聚类索引,其会将高维空间划分为多个聚类,随后搜索距离查询向量最近的那些聚类的子集。它比 HNSW 拥有更快的构建时间以及更少的内存消耗,但查询性能也是相对更低的(从速度与召回的权衡来看)。
收到好召回的三个关键:
- 在数据表拥有一些数据后再创建索引
- 选择一个合适的列表数量 - 对于 100万行而言一个好的起步数值是 行数/1000,超过 100万行则是 行数开平方。
- 查询中指定一个合适的探测数量(更高的值有益于召回,更低的值有益于速度)- 一个好的起步数值是 列表数开平方
为每一个你想要使用的距离函数添加 IVFFlat 索引。
L2 距离。
=# CREATE INDEX ON items USING ivfflat (embedding vector_l2_ops) WITH (lists = 100);
内积。
=# CREATE INDEX ON items USING ivfflat (embedding vector_ip_ops) WITH (lists = 100);
余弦距离。
=# CREATE INDEX ON items USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100);
直至 2000 维的向量数据都可以被索引。
3.3.1.1 查询选项
指定探测数量(默认值为 1)。
=# SET ivfflat.probes = 10;
更高的此参数值以速度为代价提供更好的召回,并且可以将其设置为精确最近邻搜索的列表数量(此时优化器不会使用索引)。
在一个事务中使用 SET LOCAL
子句来将其设置为一个单一查询。
=# BEGIN;
SET LOCAL ivfflat.probes = 10;
SELECT ...
COMMIT;
3.3.2 HNSW
HNSW 索引属图索引,一个 HNSW 索引会创建一个多层图,最下面的一层包含所有的节点向量,越往上的层包含的节点数量越少。它相比 IVFFlat 减缓了构建时间且运用了更多的内存资源,但也有更好的查询性能(从速度与召回的权衡来看)。它没有像 IVFFlat 一样的训练步骤,所以可以在表中没有任何数据的时候就创建索引。
为每一个你想要使用的距离函数添加 HNSW 索引。
L2 距离。
=# CREATE INDEX ON items USING hnsw (embedding vector_l2_ops);
内积。
=# CREATE INDEX ON items USING hnsw (embedding vector_ip_ops);
余弦距离。
=# CREATE INDEX ON items USING hnsw (embedding vector_cosine_ops);
直至 2000 维的向量数据都可以被索引。
3.3.2.1 索引选项
指定 HNSW 参数。
m
- 每层的最大连接数(默认值为16
)ef_construction
- 用于构造图的动态候选列表的大小(默认值为64
)
=# CREATE INDEX ON items USING hnsw (embedding vector_l2_ops) WITH (m = 16, ef_construction = 64);
3.3.2.2 查询选项
指定搜索的的动态候选列表大小(默认值为 40)。
=# SET hnsw.ef_search = 100;
更高的此参数值会以速度为代价提高更好的召回。
在一个事务中使用 SET LOCAL
子句来将其设置为一个单一查询。
=# BEGIN;
SET LOCAL hnsw.ef_search = 100;
SELECT ...
COMMIT;
3.3.3 索引进度
检查索引进度。
=# SELECT phase, tuples_done, tuples_total FROM pg_stat_progress_create_index;
这些阶段是:
initializing
performing k-means
- 仅 IVFFlatassigning tuples
- 仅 IVFFlatloading tuples
注意!
tuples_done
和 tuples_total
仅可在 loading tuples 加载元组阶段填充。
3.4 过滤
有很多方法使用 WHERE
子句来索引最近邻查询。
=# SELECT * FROM items WHERE id = 123 ORDER BY embedding <-> '[3,1,2]' LIMIT 5;
在一个或多个用于精确搜索的 WHERE
列上创建索引。
=# CREATE INDEX ON items (id);
或在用于模糊搜索的向量列上创建一个局部索引。
=# CREATE INDEX ON items USING ivfflat (embedding vector_l2_ops) WITH (lists = 100)
WHERE (id = 123);
使用分区对 WHERE
列的许多不同值进行模糊搜索。
=# CREATE TABLE items (embedding vector(3), id int) PARTITION BY LIST(id);
4 示例
4.1 Python
import psycopg2
def connect_to_database():
# 连接到数据库
try:
conn = psycopg2.connect(
database="your_database_name",
user="your_database_user",
password="your_database_password",
host="your_database_host",
port="your_database_port"
)
return conn
except psycopg2.Error as e:
print(f"Error connecting to the database: {e}")
return None
def close_database_connection(conn):
# 关闭数据库连接
if conn:
conn.close()
def create_extension(conn, extension_name):
try:
cursor = conn.cursor()
# 执行插入操作
cursor.execute('create extension if not exists %s ;' % extension_name)
conn.commit()
cursor.close()
print("extension %s has been created successfully." % extension_name)
except psycopg2.Error as e:
print(f"Error create extension: {e}")
def create_table(conn, tablename, sql_tabledef):
try:
cursor = conn.cursor()
# 创建向量表
cursor.execute(sql_tabledef)
conn.commit()
cursor.close
print('table %s has been created successfully.' % tablename)
except psycopg2.Error as e:
print(f"Error inserting data: {e}")
def insert_data(conn, tablename, data):
try:
cursor = conn.cursor()
# 执行插入操作
cursor.execute("INSERT INTO %s VALUES ('%s',%s, %s,'%s')" % (
tablename, data['value1'], data['value2'], data['value3'], data['value4']))
conn.commit()
cursor.close()
print("Data inserted successfully.")
except psycopg2.Error as e:
print(f"Error inserting data: {e}")
def select_data(conn, tablename):
try:
cursor = conn.cursor()
# 执行查询操作
cursor.execute("SELECT * FROM %s" % tablename)
result = cursor.fetchall()
cursor.close()
return result
except psycopg2.Error as e:
print(f"Error selecting data: {e}")
return []
def vector_search(conn, tablename, vector_type, nearest_values):
try:
cursor = conn.cursor()
# 执行查询操作
cursor.execute(
"SELECT * FROM %s order by embedding %s '%s' limit 5;" % (tablename, vector_type, nearest_values))
result = cursor.fetchall()
cursor.close()
return result
except psycopg2.Error as e:
print(f"Error selecting data: {e}")
return []
def update_data(conn, tablename, data):
try:
cursor = conn.cursor()
# 执行更新操作
cursor.execute(
"UPDATE %s SET embedding = '%s' WHERE id = %s" % (tablename, data['new_value'], data['condition']))
conn.commit()
cursor.close()
print("Data updated successfully.")
except psycopg2.Error as e:
print(f"Error updating data: {e}")
def delete_data(conn, tablename, condition):
try:
cursor = conn.cursor()
# 执行删除操作
cursor.execute("DELETE FROM %s WHERE id = %s" % (tablename, condition))
conn.commit()
cursor.close()
print("Data deleted successfully.")
except psycopg2.Error as e:
print(f"Error deleting data: {e}")
if __name__ == "__main__":
conn = connect_to_database()
if conn:
# 在这里执行数据库操作
data_to_insert = {
'value1': '2023-10-12 00:00:00',
'value2': 1,
'value3': 1,
'value4': [1, 2, 3]
}
# 创建vector扩展
create_extension(conn, 'vector')
# 创建向量表
sql_tabledef = 'drop table if exists documents_l2;' \
'CREATE TABLE documents_l2(' \
'created_at timestamptz,' \
'id integer,' \
'document_type int,' \
'embedding vector(3)' \
')' \
'distributed by (id);';
create_table(conn, 'documents_l2', sql_tabledef)
# 向表里写入数据
insert_data(conn, 'documents_l2', data_to_insert)
# 查询数据
data_to_select = select_data(conn, 'documents_l2')
print("Selected Data:", data_to_select)
# 更新数据
data_to_update = {
'new_value': '[4,5,6]',
'condition': 1
}
update_data(conn, 'documents_l2', data_to_update)
# 查询数据
data_to_select = select_data(conn, 'documents_l2')
print("Selected Data:", data_to_select)
# 向量检索
print('向量检索:L2 distance')
data_vector_search = vector_search(conn, 'documents_l2', '<->', '[4,5,6]')
print("vector search(L2):", data_vector_search)
print('向量检索:inner product')
data_vector_search = vector_search(conn, 'documents_l2', '<#>', '[4,5,6]')
print("vector search(IP):", data_vector_search)
print('向量检索:cosine distance')
data_vector_search = vector_search(conn, 'documents_l2', '<=>', '[4,5,6]')
print("vector search(cos):", data_vector_search)
# 删除数据
data_to_delete = 1
delete_data(conn, 'documents_l2', data_to_delete)
# 查询数据
data_to_select = select_data(conn, 'documents_l2')
print("Selected Data:", data_to_select)
close_database_connection(conn)
4.2 Golang
package main
import (
"database/sql"
"fmt"
_ "github.com/lib/pq"
)
type Data struct {
Id int32
Embedding string
}
func getConn() (*sql.DB, error) {
psqlInfo := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", "192.168.100.30", 6432, "mxadmin", "", "mxadmin")
db, err := sql.Open("postgres", psqlInfo)
if err != nil {
return nil, err
}
return db, nil
}
func insertData() error {
db, _ := getConn()
defer db.Close()
_, err := db.Exec(`INSERT INTO items (embedding) VALUES ('[1,2,3]'), ('[4,5,6]')`)
return err
}
func selectData() ([]Data, error) {
db, _ := getConn()
defer db.Close()
datas := make([]Data, 0)
rows, err := db.Query(`SELECT id, embedding FROM items ORDER BY embedding <=> '[0.45, 0.4, 0.85]' LIMIT 10;`)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
data := Data{}
err = rows.Scan(&data.Id, &data.Embedding)
if err != nil {
return nil, err
}
datas = append(datas, data)
}
return datas, err
}
func main() {
insertData()
datas, _ := selectData()
fmt.Println(datas)
}