Incorporar dados armazenados em uma tabela PostgreSQL é sem dúvida útil – com aplicações que vão desde pesquisa semântica e sistemas de recomendação até aplicações generativas de IA e geração aumentada de recuperação. Mas criar e gerenciar embeddings para dados em tabelas PostgreSQL pode ser complicado, com muitas considerações e casos extremos a serem considerados, como manter os embeddings atualizados com atualizações e exclusões de tabelas, garantir resiliência contra falhas e impacto nos sistemas existentes dependentes de a mesa.
Nesta postagem do blog, discutiremos as decisões técnicas de design e as compensações que fizemos ao construir o PgVectorizer para garantir simplicidade, resiliência e alto desempenho. Também discutiremos designs alternativos se você quiser criar o seu próprio.
Vamos começar.
Primeiro, vamos descrever como funcionará o sistema que estamos construindo. Sinta-se à vontade para pular esta seção se você já leu o
Como exemplo ilustrativo, usaremos uma aplicação de blog simples que armazena dados no PostgreSQL usando uma tabela definida da seguinte forma:
CREATE TABLE blog ( id SERIAL PRIMARY KEY NOT NULL, title TEXT NOT NULL, author TEXT NOT NULL, contents TEXT NOT NULL, category TEXT NOT NULL, published_time TIMESTAMPTZ NULL --NULL if not yet published );
Queremos criar embeddings no conteúdo da postagem do blog para que possamos usá-lo posteriormente para pesquisa semântica e geração aumentada de recuperação de energia. Os embeddings só devem existir e ser pesquisáveis em blogs que foram publicados (onde published_time
é NOT NULL
).
Ao construir este sistema de embeddings, conseguimos identificar uma série de objetivos que qualquer sistema simples e resiliente que crie embeddings deveria ter:
Nenhuma modificação na tabela original. Isso permite que sistemas e aplicações que já utilizam esta tabela não sejam impactados por alterações no sistema de incorporação. Isto é especialmente importante para sistemas legados.
Nenhuma modificação nos aplicativos que interagem com a tabela. Ter que modificar o código que altera a tabela pode não ser possível para sistemas legados. Também é um design de software ruim porque combina sistemas que não usam embeddings com código que gera a incorporação.
Atualize automaticamente os embeddings quando as linhas da tabela de origem forem alteradas (neste caso, a tabela do blog). Isso diminui a carga de manutenção e contribui para um software livre de preocupações. Ao mesmo tempo, esta atualização não precisa ser instantânea ou dentro do mesmo commit. Para a maioria dos sistemas, “consistência eventual” é suficiente.
Garanta resiliência contra falhas de rede e de serviço: a maioria dos sistemas gera incorporações por meio de uma chamada para um sistema externo, como a API OpenAI. Em cenários em que o sistema externo está inoperante ou ocorre um mau funcionamento da rede, é imperativo que o restante do seu sistema de banco de dados continue funcionando.
Essas diretrizes foram a base de uma arquitetura robusta que implementamos usando o
Aqui está a arquitetura que escolhemos:
Neste design, primeiro adicionamos um gatilho à tabela do blog que monitora alterações e, ao ver uma modificação, insere um trabalho na tabela blog_work_queue que indica que uma linha na tabela do blog está desatualizada com sua incorporação.
Em uma programação fixa, um trabalho de criador de embeddings pesquisará a tabela blog_work_queue e, se encontrar trabalho para fazer, fará o seguinte em um loop:
Para ver este sistema em ação, veja um exemplo de uso para
Voltando ao exemplo da nossa tabela de aplicação de blog, em alto nível, o PgVectorizer tem que fazer duas coisas:
Acompanhe as alterações nas linhas do blog para saber quais linhas foram alteradas.
Forneça um método para processar as alterações para criar embeddings.
Ambos devem ser altamente simultâneos e de alto desempenho. Vamos ver como isso funciona.
Você pode criar uma tabela de fila de trabalho simples com o seguinte:
CREATE TABLE blog_embedding_work_queue ( id INT ); CREATE INDEX ON blog_embedding_work_queue(id);
Esta é uma tabela muito simples, mas há um item digno de nota: esta tabela não possui chave única. Isso foi feito para evitar problemas de bloqueio ao processar a fila, mas significa que podemos ter duplicatas. Discutiremos a compensação posteriormente na Alternativa 1 abaixo.
Em seguida, você cria um gatilho para rastrear quaisquer alterações feitas no blog
:
CREATE OR REPLACE FUNCTION blog_wq_for_embedding() RETURNS TRIGGER LANGUAGE PLPGSQL AS $$ BEGIN IF (TG_OP = 'DELETE') THEN INSERT INTO blog_embedding_work_queue VALUES (OLD.id); ELSE INSERT INTO blog_embedding_work_queue VALUES (NEW.id); END IF; RETURN NULL; END; $$; CREATE TRIGGER track_changes_for_embedding AFTER INSERT OR UPDATE OR DELETE ON blog FOR EACH ROW EXECUTE PROCEDURE blog_wq_for_embedding(); INSERT INTO blog_embedding_work_queue SELECT id FROM blog WHERE published_time is NOT NULL;
O gatilho insere o ID do blog que mudou para blog_work_queue. Instalamos o gatilho e, em seguida, inserimos todos os blogs existentes no work_queue. Essa ordem é importante para garantir que nenhum ID seja descartado.
Agora, vamos descrever alguns designs alternativos e por que os rejeitamos.
A introdução desta chave eliminaria o problema de entradas duplicadas. No entanto, não está isento de desafios, principalmente porque tal chave nos forçaria a usar a cláusula INSERT…ON CONFLICT DO NOTHING
para inserir novos IDs na tabela, e essa cláusula bloqueia o ID na árvore B.
Eis o dilema: durante a fase de processamento, é necessário deletar as linhas que estão sendo trabalhadas para evitar o processamento simultâneo. No entanto, a confirmação desta exclusão só pode ser feita após a incorporação correspondente ter sido colocada em blog_embeddings. Isso garante que nenhum ID seja perdido se houver uma interrupção no meio do caminho, por exemplo, se a criação da incorporação falhar após a exclusão, mas antes da incorporação ser gravada.
Agora, se criarmos uma chave única ou primária, a transação que supervisiona a exclusão permanece aberta. Conseqüentemente, isso atua como um bloqueio nesses IDs específicos, impedindo sua inserção de volta no blog_work_queue durante toda a duração do trabalho de criação de incorporação. Dado que leva mais tempo para criar embeddings do que uma transação típica de banco de dados, isso significa problemas. O bloqueio paralisaria o gatilho da tabela principal do 'blog', levando a uma queda no desempenho do aplicativo principal. Para piorar as coisas, se processar várias linhas em um lote, os impasses também se tornam um problema potencial.
No entanto, os potenciais problemas decorrentes de entradas duplicadas ocasionais podem ser geridos durante a fase de processamento, conforme ilustrado mais adiante. Uma duplicata esporádica aqui e ali não é um problema, pois aumenta apenas marginalmente a quantidade de trabalho que o trabalho de incorporação executa. Isto é certamente mais palatável do que enfrentar os desafios de bloqueio mencionados acima.
Por exemplo, poderíamos adicionar uma coluna booleana embedded
definida como falsa na modificação e invertida para verdadeira quando a incorporação for criada. Existem três razões para rejeitar este design:
Não queremos modificar a tabela blog
pelos motivos já mencionados acima.
A obtenção eficiente de uma lista de blogs não incorporados exigiria um índice adicional (ou índice parcial) na tabela de blogs. Isso desaceleraria outras operações.
Isso aumenta a rotatividade na tabela porque cada modificação agora seria gravada duas vezes (uma vez com embedding=false e uma vez com embedding=true) devido à natureza MVCC do PostgreSQL.
Um work_queue_table separado resolve esses problemas.
Essa abordagem tem vários problemas:
Se o serviço de incorporação estiver inativo, o gatilho precisará falhar (abortando sua transação) ou você precisará criar um caminho de código de backup que... armazene os IDs que não puderam ser incorporados em uma fila. A última solução nos leva de volta ao design proposto, mas com mais complexidade acrescentada.
Esse gatilho provavelmente será muito mais lento que o restante das operações do banco de dados devido à latência necessária para entrar em contato com um serviço externo. Isso desacelerará o restante das operações do banco de dados na tabela.
Força o usuário a escrever o código de incorporação de criação diretamente no banco de dados. Dado que a língua franca da IA é o Python e que a criação de incorporação geralmente requer muitas outras bibliotecas, isso nem sempre é fácil ou mesmo possível (especialmente se for executado em um ambiente de nuvem PostgreSQL hospedado). É muito melhor ter um design onde você tenha a opção de criar embeddings dentro ou fora do banco de dados.
Agora que temos uma lista de blogs que precisam ser incorporados, vamos processar a lista!
Existem muitas maneiras de criar incorporações. Recomendamos usar um script Python externo. Este script verificará a fila de trabalhos e as postagens de blog relacionadas, invocará um serviço externo para criar os embeddings e, em seguida, armazenará esses embeddings de volta no banco de dados. Nosso raciocínio para esta estratégia é o seguinte:
Escolha de Python : Recomendamos Python porque oferece um ecossistema rico e incomparável para tarefas de dados de IA, destacado pelo poderoso desenvolvimento de LLM e bibliotecas de dados como
Optando por um script externo em vez de PL/Python : queríamos que os usuários tivessem controle sobre como incorporar seus dados. No entanto, ao mesmo tempo, muitos provedores de nuvem Postgres não permitem a execução de código Python arbitrário dentro do banco de dados devido a questões de segurança. Portanto, para permitir que os usuários tenham flexibilidade tanto na incorporação de scripts quanto no local onde hospedam seu banco de dados, optamos por um design que usava scripts Python externos.
Os trabalhos devem ter desempenho e segurança de simultaneidade. A simultaneidade garante que, se os jobs começarem a ficar atrasados, os agendadores poderão iniciar mais jobs para ajudar o sistema a acompanhar e lidar com a carga.
Veremos como configurar cada um desses métodos posteriormente, mas primeiro, vamos ver como seria o script Python. Fundamentalmente, o roteiro tem três partes:
Leia a fila de trabalho e a postagem do blog
Crie uma incorporação para a postagem do blog
Escreva a incorporação na tabela blog_embedding
As etapas 2 e 3 são executadas por um retorno de chamada embed_and_write
que definimos no
Primeiro mostraremos o código e depois destacaremos os principais elementos em jogo:
def process_queue(embed_and_write_cb, batch_size:int=10): with psycopg2.connect(TIMESCALE_SERVICE_URL) as conn: with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor: cursor.execute(f""" SELECT to_regclass('blog_embedding_work_queue')::oid; """) table_oid = cursor.fetchone()[0] cursor.execute(f""" WITH selected_rows AS ( SELECT id FROM blog_embedding_work_queue LIMIT {int(batch_size)} FOR UPDATE SKIP LOCKED ), locked_items AS ( SELECT id, pg_try_advisory_xact_lock( {int(table_oid)}, id) AS locked FROM ( SELECT DISTINCT id FROM selected_rows ORDER BY id ) as ids ), deleted_rows AS ( DELETE FROM blog_embedding_work_queue WHERE id IN ( SELECT id FROM locked_items WHERE locked = true ORDER BY id ) ) SELECT locked_items.id as locked_id, {self.table_name}.* FROM locked_items LEFT JOIN blog ON blog.id = locked_items.id WHERE locked = true ORDER BY locked_items.id """) res = cursor.fetchall() if len(res) > 0: embed_and_write_cb(res) return len(res) process_queue(embed_and_write)
O código SQL no trecho acima é sutil porque foi projetado para ter desempenho e segurança de simultaneidade, então vamos analisá-lo:
Retirando itens da fila de trabalho : Inicialmente, o sistema recupera um número especificado de entradas da fila de trabalho, determinado pelo parâmetro de tamanho da fila em lote. Um bloqueio FOR UPDATE é usado para garantir que os scripts em execução simultânea não tentem processar os mesmos itens da fila. A diretiva SKIP LOCKED garante que se alguma entrada estiver sendo tratada por outro script, o sistema irá ignorá-la em vez de esperar, evitando atrasos desnecessários.
Bloqueio de IDs de blog : devido à possibilidade de entradas duplicadas para o mesmo blog_id na tabela de fila de trabalho, simplesmente bloquear essa tabela é insuficiente. O processamento simultâneo do mesmo ID por trabalhos diferentes seria prejudicial. Considere a seguinte condição de corrida potencial:
A tarefa 1 inicia e acessa um blog, recuperando a versão 1.
Ocorre uma atualização externa no blog.
Posteriormente, começa o Job 2, obtendo a versão 2.
Ambos os trabalhos iniciam o processo de geração de incorporação.
O trabalho 2 é concluído, armazenando o embedding correspondente à versão 2 do blog.
A tarefa 1, após a conclusão, substitui erroneamente a incorporação da versão 2 pela versão 1 desatualizada.
Embora seja possível combater esse problema introduzindo o rastreamento explícito de versões, ele introduz uma complexidade considerável sem benefício de desempenho. A estratégia que optamos não apenas atenua esse problema, mas também evita operações redundantes e desperdício de trabalho com a execução simultânea de scripts.
Um bloqueio de aconselhamento Postgres, prefixado com o identificador da tabela para evitar possíveis sobreposições com outros bloqueios, é empregado. A variante try
, análoga à aplicação anterior de SKIP LOCKED, garante que o sistema evite esperar por bloqueios. A inclusão da cláusula ORDER BY blog_id ajuda a evitar possíveis impasses. Abordaremos algumas alternativas abaixo.
Limpando a fila de trabalho : o script exclui todos os itens da fila de trabalho dos blogs que bloqueamos com sucesso. Se esses itens da fila estiverem visíveis por meio do MVCC (Controle de simultaneidade multiversão), suas atualizações serão manifestadas na linha do blog recuperada. Observe que excluímos todos os itens com o ID do blog fornecido, não apenas os itens lidos ao selecionar as linhas: isso lida efetivamente com entradas duplicadas para o mesmo ID do blog. É crucial observar que esta exclusão só é confirmada após invocar a função embed_and_write() e o subsequente armazenamento da incorporação atualizada. Essa sequência garante que não perderemos nenhuma atualização, mesmo se o script falhar durante a fase de geração de incorporação.
Fazendo com que os blogs sejam processados: Na última etapa, buscamos os blogs para processar. Observe o uso da junção esquerda: que nos permite recuperar os IDs do blog para itens excluídos que não terão uma linha de blog. Precisamos rastrear esses itens para excluir suas incorporações. No retorno de chamada embed_and_write
, usamos public_time sendo NULL como sentinela para o blog que está sendo excluído (ou não publicado, caso em que também queremos excluir a incorporação).
Se o sistema já utiliza bloqueios consultivos e você está preocupado com colisões, é possível usar uma tabela com um ID de blog como chave primária e bloquear as linhas. Na verdade, esta pode ser a própria tabela do blog se você tiver certeza de que esses bloqueios não deixarão nenhum outro sistema lento (lembre-se, esses bloqueios devem ser mantidos durante todo o processo de incorporação, o que pode demorar um pouco).
Alternativamente, você pode ter uma tabela blog_embedding_locks apenas para essa finalidade. Não sugerimos a criação dessa tabela porque achamos que ela pode ser um grande desperdício em termos de espaço, e o uso de bloqueios consultivos evita essa sobrecarga.
Nesta postagem do blog, mostramos os bastidores de como criamos um sistema que possui resiliência, lidando com eficácia com possíveis tempos de inatividade do serviço de geração de incorporação. Seu design é adequado para gerenciar uma alta taxa de modificações de dados e pode usar perfeitamente processos de geração de incorporação simultâneos para acomodar cargas elevadas.
Além disso, o paradigma de submeter dados ao PostgreSQL e usar o banco de dados para gerenciar a geração de incorporação em segundo plano surge como um mecanismo fácil para supervisionar a manutenção da incorporação em meio a modificações de dados. Uma infinidade de demonstrações e tutoriais no espaço de IA concentram-se exclusivamente na criação inicial de dados a partir de documentos, ignorando as intrincadas nuances associadas à preservação da sincronização de dados à medida que ela evolui.
No entanto, em ambientes de produção reais, os dados mudam invariavelmente, e lidar com as complexidades de rastrear e sincronizar essas mudanças não é uma tarefa trivial. Mas é para isso que um banco de dados foi projetado! Por que não apenas usá-lo?
Escrito por Matvey Arye.
Também publicado aqui.