pgvector

本文档介绍了存储、查询向量数据的插件工具 pgvector。

1 描述

  • 功能
    • 存储你的向量数据;
    • 可进行精确/模糊的相似最近邻搜索;
    • 支持 L2 距离,内积,余弦距离计算;
    • 支持 IVFFlat、HNSW 索引;
    • 支持在 Postgres 客户端中使用任何编程语言运行。

2 快速上手

安装扩展。每个数据库只需创建一次。

=# CREATE EXTENSION vector;

创建一个三维的向量列。

=# CREATE TABLE items (id bigserial PRIMARY KEY, embedding vector(3));

插入向量数据。

=# INSERT INTO items (embedding) VALUES ('[1,2,3]'), ('[4,5,6]');

通过 L2 距离获取最近邻。

=# SELECT * FROM items ORDER BY embedding <-> '[3,1,2]' LIMIT 5;

也支持内积(<#>)和余弦距离(<=>)。

注意!
<#> 返回负内积,因为 YMatrix 仅支持对运算符进行顺序索引扫描。

3 完整功能

3.1 存储

此部分展示了 DDL/DML 语句的使用。

创建一个含向量列的新表。

=# CREATE TABLE items (id bigserial PRIMARY KEY, embedding vector(3));

或向已存在的表中添加一个向量列。

=# ALTER TABLE items ADD COLUMN embedding vector(3);

插入向量数据。

=# INSERT INTO items (embedding) VALUES ('[1,2,3]'), ('[4,5,6]');

Upsert 向量数据。

=# INSERT INTO items (id, embedding) VALUES (1, '[1,2,3]'), (2, '[4,5,6]')
   ON CONFLICT (id) DO UPDATE SET embedding = EXCLUDED.embedding;

更新向量数据。

=# UPDATE items SET embedding = '[1,2,3]' WHERE id = 1;

删除向量数据。

=# DELETE FROM items WHERE id = 1;

3.2 查询

此部分展示了查询 SQL 语句的使用。

获取一个向量数据的最近邻。

=# SELECT * FROM items ORDER BY embedding <-> '[3,1,2]' LIMIT 5;

获取一行的最近邻。

=# SELECT * FROM items WHERE id != 1 ORDER BY embedding <-> (SELECT embedding FROM items WHERE id = 1) LIMIT 5;

获取包含特定距离的行。

=# SELECT * FROM items WHERE embedding <-> '[3,1,2]' < 5;

注意!
查询中同时包含 ORDER BYLIMIT 子句以使用索引。

3.2.1 距离

获取距离。

=# SELECT embedding <-> '[3,1,2]' AS distance FROM items;

对于内积,则需乘以 -1(因为 <#> 返回负内积)。

=# SELECT (embedding <#> '[3,1,2]') * -1 AS inner_product FROM items;

对于余弦相似度,则需 1 - 余弦距离

=# SELECT 1 - (embedding <=> '[3,1,2]') AS cosine_similarity FROM items;

3.2.2 聚集

平均向量。

=# SELECT AVG(embedding) FROM items;

平均向量组。

=# SELECT id, AVG(embedding) FROM items GROUP BY id;

3.3 索引

默认情况下,pgvector 会执行精确的最近邻搜索,从而提供了完美的召回率。

你可以添加一个索引,以使用近似最近邻搜索,这会牺牲一些召回率来换取速度。与传统的索引不同,在添加一个模糊索引后,你将会看到查询返回的多个不同结果。

支持的索引类型如下:

  • IVFFlat
  • HNSW

3.3.1 IVFFlat

IVFFlat 索引属聚类索引,其会将高维空间划分为多个聚类,随后搜索距离查询向量最近的那些聚类的子集。它比 HNSW 拥有更快的构建时间以及更少的内存消耗,但查询性能也是相对更低的(从速度与召回的权衡来看)。

收到好召回的三个关键:

  • 在数据表拥有一些数据后再创建索引
  • 选择一个合适的列表数量 - 对于 100万行而言一个好的起步数值是 行数/1000,超过 100万行则是 行数开平方。
  • 查询中指定一个合适的探测数量(更高的值有益于召回,更低的值有益于速度)- 一个好的起步数值是 列表数开平方

为每一个你想要使用的距离函数添加 IVFFlat 索引。

L2 距离。

=# CREATE INDEX ON items USING ivfflat (embedding vector_l2_ops) WITH (lists = 100);

内积。

=# CREATE INDEX ON items USING ivfflat (embedding vector_ip_ops) WITH (lists = 100);

余弦距离。

=# CREATE INDEX ON items USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100);

直至 2000 维的向量数据都可以被索引。

3.3.1.1 查询选项

指定探测数量(默认值为 1)。

=# SET ivfflat.probes = 10;

更高的此参数值以速度为代价提供更好的召回,并且可以将其设置为精确最近邻搜索的列表数量(此时优化器不会使用索引)。

在一个事务中使用 SET LOCAL 子句来将其设置为一个单一查询。

=# BEGIN;
SET LOCAL ivfflat.probes = 10;
SELECT ...
COMMIT;

3.3.2 HNSW

HNSW 索引属图索引,一个 HNSW 索引会创建一个多层图,最下面的一层包含所有的节点向量,越往上的层包含的节点数量越少。它相比 IVFFlat 减缓了构建时间且运用了更多的内存资源,但也有更好的查询性能(从速度与召回的权衡来看)。它没有像 IVFFlat 一样的训练步骤,所以可以在表中没有任何数据的时候就创建索引。

为每一个你想要使用的距离函数添加 HNSW 索引。

L2 距离。

=# CREATE INDEX ON items USING hnsw (embedding vector_l2_ops);

内积。

=# CREATE INDEX ON items USING hnsw (embedding vector_ip_ops);

余弦距离。

=# CREATE INDEX ON items USING hnsw (embedding vector_cosine_ops);

直至 2000 维的向量数据都可以被索引。

3.3.2.1 索引选项

指定 HNSW 参数。

  • m - 每层的最大连接数(默认值为 16
  • ef_construction - 用于构造图的动态候选列表的大小(默认值为 64
=# CREATE INDEX ON items USING hnsw (embedding vector_l2_ops) WITH (m = 16, ef_construction = 64);
3.3.2.2 查询选项

指定搜索的的动态候选列表大小(默认值为 40)。

=# SET hnsw.ef_search = 100;

更高的此参数值会以速度为代价提高更好的召回。

在一个事务中使用 SET LOCAL 子句来将其设置为一个单一查询。

=# BEGIN;
SET LOCAL hnsw.ef_search = 100;
SELECT ...
COMMIT;

3.3.3 索引进度

检查索引进度。

=# SELECT phase, tuples_done, tuples_total FROM pg_stat_progress_create_index;

这些阶段是:

  1. initializing
  2. performing k-means - 仅 IVFFlat
  3. assigning tuples - 仅 IVFFlat
  4. loading tuples

注意!
tuples_donetuples_total 仅可在 loading tuples 加载元组阶段填充。

3.4 过滤

有很多方法使用 WHERE 子句来索引最近邻查询。

=# SELECT * FROM items WHERE id = 123 ORDER BY embedding <-> '[3,1,2]' LIMIT 5;

在一个或多个用于精确搜索的 WHERE 列上创建索引。

=# CREATE INDEX ON items (id);

或在用于模糊搜索的向量列上创建一个局部索引。

=# CREATE INDEX ON items USING ivfflat (embedding vector_l2_ops) WITH (lists = 100)
    WHERE (id = 123);

使用分区对 WHERE 列的许多不同值进行模糊搜索。

=# CREATE TABLE items (embedding vector(3), id int) PARTITION BY LIST(id);

4 示例

4.1 Python

import psycopg2


def connect_to_database():
    # 连接到数据库
    try:
        conn = psycopg2.connect(
            database="your_database_name",
            user="your_database_user",
            password="your_database_password",
            host="your_database_host",
            port="your_database_port"
        )
        return conn
    except psycopg2.Error as e:
        print(f"Error connecting to the database: {e}")
        return None


def close_database_connection(conn):
    # 关闭数据库连接
    if conn:
        conn.close()


def create_extension(conn, extension_name):
    try:
        cursor = conn.cursor()
        # 执行插入操作
        cursor.execute('create extension if not exists %s ;' % extension_name)
        conn.commit()
        cursor.close()
        print("extension %s has been created successfully." % extension_name)
    except psycopg2.Error as e:
        print(f"Error create extension: {e}")


def create_table(conn, tablename, sql_tabledef):
    try:
        cursor = conn.cursor()
        # 创建向量表
        cursor.execute(sql_tabledef)
        conn.commit()
        cursor.close
        print('table %s has been created successfully.' % tablename)
    except psycopg2.Error as e:
        print(f"Error inserting data: {e}")


def insert_data(conn, tablename, data):
    try:
        cursor = conn.cursor()
        # 执行插入操作
        cursor.execute("INSERT INTO %s VALUES ('%s',%s, %s,'%s')" % (
            tablename, data['value1'], data['value2'], data['value3'], data['value4']))
        conn.commit()
        cursor.close()
        print("Data inserted successfully.")
    except psycopg2.Error as e:
        print(f"Error inserting data: {e}")


def select_data(conn, tablename):
    try:
        cursor = conn.cursor()
        # 执行查询操作
        cursor.execute("SELECT * FROM %s" % tablename)
        result = cursor.fetchall()
        cursor.close()
        return result
    except psycopg2.Error as e:
        print(f"Error selecting data: {e}")
        return []


def vector_search(conn, tablename, vector_type, nearest_values):
    try:
        cursor = conn.cursor()
        # 执行查询操作
        cursor.execute(
            "SELECT * FROM %s order by embedding %s '%s' limit 5;" % (tablename, vector_type, nearest_values))
        result = cursor.fetchall()
        cursor.close()
        return result
    except psycopg2.Error as e:
        print(f"Error selecting data: {e}")
        return []


def update_data(conn, tablename, data):
    try:
        cursor = conn.cursor()
        # 执行更新操作
        cursor.execute(
            "UPDATE %s SET embedding = '%s' WHERE id = %s" % (tablename, data['new_value'], data['condition']))
        conn.commit()
        cursor.close()
        print("Data updated successfully.")
    except psycopg2.Error as e:
        print(f"Error updating data: {e}")


def delete_data(conn, tablename, condition):
    try:
        cursor = conn.cursor()
        # 执行删除操作
        cursor.execute("DELETE FROM %s WHERE id = %s" % (tablename, condition))
        conn.commit()
        cursor.close()
        print("Data deleted successfully.")
    except psycopg2.Error as e:
        print(f"Error deleting data: {e}")


if __name__ == "__main__":
    conn = connect_to_database()
    if conn:
        # 在这里执行数据库操作
        data_to_insert = {
            'value1': '2023-10-12 00:00:00',
            'value2': 1,
            'value3': 1,
            'value4': [1, 2, 3]

        }

        # 创建vector扩展
        create_extension(conn, 'vector')

        # 创建向量表
        sql_tabledef = 'drop table if exists documents_l2;' \
                       'CREATE TABLE documents_l2(' \
                       'created_at timestamptz,' \
                       'id integer,' \
                       'document_type int,' \
                       'embedding vector(3)' \
                       ')' \
                       'distributed by (id);';
        create_table(conn, 'documents_l2', sql_tabledef)

        # 向表里写入数据
        insert_data(conn, 'documents_l2', data_to_insert)

        # 查询数据
        data_to_select = select_data(conn, 'documents_l2')
        print("Selected Data:", data_to_select)

        # 更新数据
        data_to_update = {
            'new_value': '[4,5,6]',
            'condition': 1
        }
        update_data(conn, 'documents_l2', data_to_update)
        # 查询数据
        data_to_select = select_data(conn, 'documents_l2')
        print("Selected Data:", data_to_select)

        # 向量检索
        print('向量检索:L2 distance')
        data_vector_search = vector_search(conn, 'documents_l2', '<->', '[4,5,6]')
        print("vector search(L2):", data_vector_search)

        print('向量检索:inner product')
        data_vector_search = vector_search(conn, 'documents_l2', '<#>', '[4,5,6]')
        print("vector search(IP):", data_vector_search)

        print('向量检索:cosine distance')
        data_vector_search = vector_search(conn, 'documents_l2', '<=>', '[4,5,6]')
        print("vector search(cos):", data_vector_search)

        # 删除数据
        data_to_delete = 1
        delete_data(conn, 'documents_l2', data_to_delete)
        # 查询数据
        data_to_select = select_data(conn, 'documents_l2')
        print("Selected Data:", data_to_select)

        close_database_connection(conn)

4.2 Golang

package main

import (
    "database/sql"
    "fmt"

    _ "github.com/lib/pq"
)

type Data struct {
    Id int32
    Embedding string
}

func getConn() (*sql.DB, error) {
    psqlInfo := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", "192.168.100.30", 6432, "mxadmin", "", "mxadmin")

    db, err := sql.Open("postgres", psqlInfo)
    if err != nil {
        return nil, err
    }

    return db, nil
}

func insertData() error {
    db, _ := getConn()
    defer db.Close()

    _, err := db.Exec(`INSERT INTO items (embedding) VALUES ('[1,2,3]'), ('[4,5,6]')`)
    return err
}

func selectData() ([]Data, error) {
    db, _ := getConn()
    defer db.Close()

    datas := make([]Data, 0)
    rows, err := db.Query(`SELECT id, embedding FROM items ORDER BY embedding <=> '[0.45, 0.4, 0.85]' LIMIT 10;`)
    if err != nil {
        return nil, err
    }
    defer rows.Close()

    for rows.Next() {
        data := Data{}
        err = rows.Scan(&data.Id, &data.Embedding)
        if err != nil {
            return nil, err
        }
        datas = append(datas, data)
    }

    return datas, err
}

func main() {
    insertData()
    datas, _ := selectData()
    fmt.Println(datas)
}