嵌入存储在PostgreSQL表中的数据无疑是有用的——其应用范围从语义搜索和推荐系统到生成式人工智能应用程序和检索增强生成。但是,在 PostgreSQL 表中创建和管理数据的嵌入可能很棘手,需要考虑许多注意事项和边缘情况,例如使嵌入与表更新和删除保持同步、确保针对故障的恢复能力以及对依赖于现有系统的影响。桌子。
在这篇博文中,我们将讨论构建PgVectorizer时的技术设计决策和权衡,以确保简单性、弹性和高性能。如果您想推出自己的设计,我们还将讨论替代设计。
让我们开始吧。
首先,让我们描述一下我们正在构建的系统将如何工作。如果您已经阅读了本节,请随意跳过本节
作为说明性示例,我们将使用一个简单的博客应用程序,该应用程序使用定义如下的表在 PostgreSQL 中存储数据:
CREATE TABLE blog ( id SERIAL PRIMARY KEY NOT NULL, title TEXT NOT NULL, author TEXT NOT NULL, contents TEXT NOT NULL, category TEXT NOT NULL, published_time TIMESTAMPTZ NULL --NULL if not yet published );
我们希望在博客文章的内容上创建嵌入,以便稍后将其用于语义搜索和强大的检索增强生成。嵌入应该仅存在并且可搜索已发布的博客(其中published_time
为NOT NULL
)。
在构建这个嵌入系统时,我们能够确定任何创建嵌入的简单且有弹性的系统都应该具有的许多目标:
对原表没有任何修改。这使得已经使用该表的系统和应用程序不会受到嵌入系统更改的影响。这对于遗留系统尤其重要。
无需修改与表交互的应用程序。对于遗留系统来说,必须修改改变表的代码可能是不可能的。这也是糟糕的软件设计,因为它将不使用嵌入的系统与生成嵌入的代码结合在一起。
当源表(在本例中为博客表)中的行发生更改时自动更新嵌入。这减轻了维护负担并有助于实现无忧软件。同时,此更新不必是即时的或在同一提交内。对于大多数系统来说,“最终一致性”就可以了。
确保针对网络和服务故障的恢复能力:大多数系统通过调用外部系统(例如 OpenAI API)来生成嵌入。在外部系统出现故障或发生网络故障的情况下,数据库系统的其余部分必须继续工作。
这些指南是我们使用以下方法实现的稳健架构的基础:
这是我们确定的架构:
在此设计中,我们首先向博客表添加一个触发器来监视更改,并在看到修改后将作业插入到 blog_work_queue 表中,该表指示博客表中的行与其嵌入已过时。
按照固定的时间表,嵌入创建者作业将轮询 blog_work_queue 表,如果它找到要做的工作,将循环执行以下操作:
要查看该系统的运行情况,请参阅使用示例
回到我们博客应用程序表的示例,从较高的层面来看, PgVectorizer必须做两件事:
跟踪博客行的更改以了解哪些行已更改。
提供一种方法来处理更改以创建嵌入。
这两者都必须是高度并发和高性能的。让我们看看它是如何工作的。
您可以使用以下内容创建一个简单的工作队列表:
CREATE TABLE blog_embedding_work_queue ( id INT ); CREATE INDEX ON blog_embedding_work_queue(id);
这是一个非常简单的表,但有一点需要注意:该表没有唯一键。这样做是为了避免处理队列时出现锁定问题,但这确实意味着我们可能会有重复项。我们稍后将在下面的替代方案 1 中讨论这种权衡。
然后创建一个触发器来跟踪对blog
所做的任何更改:
CREATE OR REPLACE FUNCTION blog_wq_for_embedding() RETURNS TRIGGER LANGUAGE PLPGSQL AS $$ BEGIN IF (TG_OP = 'DELETE') THEN INSERT INTO blog_embedding_work_queue VALUES (OLD.id); ELSE INSERT INTO blog_embedding_work_queue VALUES (NEW.id); END IF; RETURN NULL; END; $$; CREATE TRIGGER track_changes_for_embedding AFTER INSERT OR UPDATE OR DELETE ON blog FOR EACH ROW EXECUTE PROCEDURE blog_wq_for_embedding(); INSERT INTO blog_embedding_work_queue SELECT id FROM blog WHERE published_time is NOT NULL;
触发器将已更改的博客的 ID 插入 blog_work_queue。我们安装触发器,然后将任何现有博客插入到 work_queue 中。此顺序对于确保不会丢失 ID 非常重要。
现在,让我们描述一些替代设计以及我们拒绝它们的原因。
引入此密钥将消除重复条目的问题。然而,这并非没有挑战,特别是因为这样的键将迫使我们使用INSERT…ON CONFLICT DO NOTHING
子句将新 ID 插入表中,并且该子句会锁定 B 树中的 ID。
这里有一个困境:在处理阶段,有必要删除正在处理的行以防止同时处理。然而,只有在相应的嵌入被放入 blog_embeddings 后才能提交此删除。这可以确保如果中途发生中断(例如,如果嵌入创建在删除后但写入嵌入之前崩溃),则不会丢失 ID。
现在,如果我们创建唯一键或主键,则监督删除的事务将保持打开状态。因此,这充当了对这些特定 ID 的锁定,防止它们在嵌入创建作业的整个持续时间内插入到 blog_work_queue 中。鉴于创建嵌入比典型的数据库事务需要更长的时间,这会带来麻烦。该锁会阻止主“博客”表的触发器,从而导致主应用程序的性能下降。更糟糕的是,如果批量处理多行,死锁也会成为一个潜在的问题。
然而,偶尔重复条目引起的潜在问题可以在处理阶段得到管理,如下所示。零星的重复并不存在问题,因为它只会略微增加嵌入作业执行的工作量。这当然比应对上述锁定挑战更容易接受。
例如,我们可以添加一个embedded
布尔列,在修改时设置为 false,并在创建嵌入时翻转为 true。拒绝这个设计的理由有以下三个:
由于我们上面已经提到的原因,我们不想修改blog
表。
有效获取非嵌入式博客列表需要在博客表上添加额外的索引(或部分索引)。这会减慢其他操作的速度。
这会增加表的变动,因为由于 PostgreSQL 的 MVCC 性质,每个修改现在都将写入两次(一次使用 embedding=false,一次使用 embedding=true)。
单独的work_queue_table可以解决这些问题。
这种方法有几个问题:
如果嵌入服务关闭,则触发器需要失败(中止您的事务),或者您需要创建一个备份代码路径来...存储无法嵌入队列中的 ID。后一种解决方案让我们回到了我们提出的设计,但上面附加了更多的复杂性。
由于联系外部服务所需的延迟,此触发器可能比其余数据库操作慢得多。这将减慢表上其余数据库操作的速度。
它强制用户直接在数据库中编写创建嵌入代码。鉴于 AI 的通用语言是 Python,并且嵌入创建通常需要许多其他库,这并不总是容易甚至不可能(特别是在托管 PostgreSQL 云环境中运行时)。最好有一个设计,您可以选择在数据库内部或外部创建嵌入。
现在我们有了需要嵌入的博客列表,让我们处理该列表!
创建嵌入的方法有很多种。我们建议使用外部 Python 脚本。该脚本将扫描工作队列和相关博客文章,调用外部服务来制作嵌入,然后将这些嵌入存储回数据库中。我们采用该策略的理由如下:
Python 的选择:我们推荐Python ,因为它为 AI 数据任务提供了丰富、无与伦比的生态系统,并以强大的 LLM 开发和数据库(例如
选择外部脚本而不是 PL/Python :我们希望用户能够控制他们嵌入数据的方式。然而,与此同时,出于安全考虑,许多 Postgres 云提供商不允许在数据库内执行任意 Python 代码。因此,为了让用户能够灵活地使用嵌入脚本以及托管数据库的位置,我们采用了使用外部 Python 脚本的设计。
这些作业必须既高性能又并发安全。并发保证了如果作业开始落后,调度程序可以启动更多作业来帮助系统赶上并处理负载。
稍后我们将介绍如何设置每个方法,但首先让我们看看 Python 脚本是什么样子的。基本上,该脚本分为三个部分:
阅读工作队列和博客文章
为博客文章创建嵌入
将嵌入写入 blog_embedding 表
步骤 2 和 3 由我们在中定义的embed_and_write
回调执行
我们将首先向您展示代码,然后突出显示起作用的关键元素:
def process_queue(embed_and_write_cb, batch_size:int=10): with psycopg2.connect(TIMESCALE_SERVICE_URL) as conn: with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor: cursor.execute(f""" SELECT to_regclass('blog_embedding_work_queue')::oid; """) table_oid = cursor.fetchone()[0] cursor.execute(f""" WITH selected_rows AS ( SELECT id FROM blog_embedding_work_queue LIMIT {int(batch_size)} FOR UPDATE SKIP LOCKED ), locked_items AS ( SELECT id, pg_try_advisory_xact_lock( {int(table_oid)}, id) AS locked FROM ( SELECT DISTINCT id FROM selected_rows ORDER BY id ) as ids ), deleted_rows AS ( DELETE FROM blog_embedding_work_queue WHERE id IN ( SELECT id FROM locked_items WHERE locked = true ORDER BY id ) ) SELECT locked_items.id as locked_id, {self.table_name}.* FROM locked_items LEFT JOIN blog ON blog.id = locked_items.id WHERE locked = true ORDER BY locked_items.id """) res = cursor.fetchall() if len(res) > 0: embed_and_write_cb(res) return len(res) process_queue(embed_and_write)
上面代码片段中的 SQL 代码很微妙,因为它被设计为既高性能又并发安全,所以让我们看一下它:
从工作队列中获取项目:最初,系统从工作队列中检索指定数量的条目,该数量由批处理队列大小参数确定。采用 FOR UPDATE 锁是为了确保并发执行的脚本不会尝试处理相同的队列项。 SKIP LOCKED 指令确保如果任何条目当前正在由另一个脚本处理,系统将跳过它而不是等待,从而避免不必要的延迟。
锁定博客 ID :由于工作队列表中同一 blog_id 可能存在重复条目,仅锁定该表是不够的。不同作业同时处理同一 ID 是有害的。考虑以下潜在的竞争条件:
作业 1 启动并访问博客,检索版本 1。
博客发生外部更新。
随后,作业 2 开始,获取版本 2。
这两项工作都开始嵌入生成过程。
作业 2 结束,存储与博客版本 2 相对应的嵌入。
作业 1 在结束时错误地用过时的版本 1 覆盖了版本 2 嵌入。
虽然可以通过引入显式版本跟踪来解决这个问题,但它会带来相当大的复杂性,而不会带来性能优势。我们选择的策略不仅可以缓解这个问题,还可以通过并发执行脚本来防止冗余操作和浪费工作。
采用 Postgres 咨询锁,以表标识符为前缀,以避免与其他此类锁潜在的重叠。 try
变体类似于早期的 SKIP LOCKED 应用程序,可确保系统避免等待锁。包含 ORDER BY blog_id 子句有助于防止潜在的死锁。我们将在下面介绍一些替代方案。
清理工作队列:然后,脚本将删除我们已成功锁定的博客的所有工作队列项目。如果这些队列项通过多版本并发控制 (MVCC) 可见,则它们的更新将显示在检索到的博客行中。请注意,我们删除具有给定博客 ID 的所有项目,而不仅仅是选择行时读取的项目:这可以有效处理同一博客 ID 的重复条目。值得注意的是,此删除仅在调用 embed_and_write() 函数以及更新后的嵌入的后续存储后才会提交。此序列确保即使脚本在嵌入生成阶段失败,我们也不会丢失任何更新。
让博客进行处理:在最后一步中,我们获取要处理的博客。请注意左连接的使用:它允许我们检索没有博客行的已删除项目的博客 ID。我们需要跟踪这些项目以删除它们的嵌入。在embed_and_write
回调中,我们使用published_time为NULL作为博客被删除(或未发布,在这种情况下我们也想删除嵌入)的标记。
如果系统已经使用咨询锁并且您担心冲突,则可以使用以博客 ID 作为主键的表并锁定行。事实上,如果您确定这些锁不会减慢任何其他系统的速度,则这可以是博客表本身(请记住,必须在整个嵌入过程中保持这些锁,这可能需要一段时间)。
或者,您可以有一个 blog_embedding_locks 表专门用于此目的。我们不建议创建该表,因为我们认为它可能会非常浪费空间,而使用咨询锁可以避免这种开销。
在这篇博文中,我们向您介绍了如何创建一个具有弹性的系统,有效处理嵌入生成服务的潜在停机时间。其设计擅长管理高速率的数据修改,并且可以无缝地使用并发嵌入生成过程来适应更高的负载。
此外,将数据提交到 PostgreSQL 并使用数据库在后台管理嵌入生成的范例成为一种在数据修改期间监督嵌入维护的简单机制。人工智能领域的无数演示和教程都只关注从文档中最初创建数据,而忽略了与在发展过程中保持数据同步相关的复杂细微差别。
然而,在实际生产环境中,数据总是会发生变化,而应对跟踪和同步这些变化的复杂性并非易事。但这就是数据库的设计目的!为什么不直接使用它呢?
由马特维·阿耶撰写。
也发布在这里。