Este artigo foi co-escrito por mim e meu colega Kai Dai. Somos ambos engenheiros de plataforma de dados da Tencent Music (NYSE: TME), um provedor de serviços de streaming de música com impressionantes 800 milhões de usuários ativos mensais. Deixar cair o número aqui não é para se gabar, mas para dar uma dica do mar de dados com os quais meus pobres colegas de trabalho e eu temos que lidar todos os dias.
A biblioteca de música da Tencent Music contém dados de todas as formas e tipos: música gravada, música ao vivo, áudios, vídeos, etc. Como engenheiros de plataforma de dados, nosso trabalho é destilar informações dos dados, com base nas quais nossos colegas de equipe podem tomar melhores decisões para apoiar nossos usuários e parceiros musicais.
Especificamente, fazemos uma análise completa das músicas, letras, melodias, álbuns e artistas, transformamos todas essas informações em ativos de dados e as passamos para nossos usuários de dados internos para contagem de inventário, perfil de usuário, análise de métricas e segmentação de grupo .
Armazenamos e processamos a maior parte de nossos dados no Tencent Data Warehouse (TDW), uma plataforma de dados off-line onde colocamos os dados em vários sistemas de tags e métricas e criamos tabelas planas centralizando cada objeto (músicas, artistas, etc.).
Em seguida, importamos as tabelas planas para o ClickHouse para análise e o Elasticsearch para pesquisa de dados e segmentação de grupos.
Depois disso, nossos analistas de dados usaram os dados nas tags e métricas necessárias para formar conjuntos de dados para diferentes cenários de uso, durante os quais eles poderiam criar suas próprias tags e métricas.
O pipeline de processamento de dados ficou assim:
Ao trabalhar com o pipeline acima, encontramos algumas dificuldades:
Atualização Parcial : A atualização parcial de colunas não era suportada. Portanto, qualquer latência de qualquer uma das fontes de dados pode atrasar a criação de tabelas planas e, assim, comprometer a pontualidade dos dados.
Alto custo de armazenamento : os dados em diferentes tags e métricas foram atualizados em diferentes frequências. Por mais que o ClickHouse se destacasse em lidar com tabelas planas, era um grande desperdício de recursos de armazenamento apenas despejar todos os dados em uma tabela plana e particioná-la por dia, sem mencionar o custo de manutenção que a acompanhava.
Alto custo de manutenção : Arquiteturalmente falando, o ClickHouse foi caracterizado pelo forte acoplamento de nós de armazenamento e nós de computação. Seus componentes eram fortemente interdependentes, aumentando os riscos de instabilidade do cluster. Além disso, para consultas federadas em ClickHouse e Elasticsearch, tivemos que cuidar de uma grande quantidade de problemas de conexão. Isso foi apenas tedioso.
O Apache Doris , um banco de dados analítico em tempo real, possui alguns recursos que são exatamente o que precisávamos para resolver nossos problemas:
Atualização parcial : Doris suporta uma ampla variedade de modelos de dados, entre os quais o Modelo Agregado suporta atualização parcial de colunas em tempo real. Com base nisso, podemos inserir dados brutos diretamente no Doris e criar tabelas planas lá. A ingestão é assim: primeiro, usamos o Spark para carregar os dados no Kafka; então, todos os dados incrementais serão atualizados para Doris e Elasticsearch via Flink. Enquanto isso, o Flink pré-agregará os dados para liberar a carga sobre Doris e Elasticsearch.
Custo de armazenamento : Doris oferece suporte a consultas de junção de várias tabelas e consultas federadas em Hive, Iceberg, Hudi, MySQL e Elasticsearch. Isso nos permite dividir as grandes tabelas planas em tabelas menores e particioná-las por frequência de atualização. Os benefícios de fazer isso incluem um alívio da carga de armazenamento e um aumento da taxa de transferência de consulta.
Custo de manutenção : Doris é de arquitetura simples e compatível com o protocolo MySQL. A implantação do Doris envolve apenas dois processos (FE e BE) sem dependência de outros sistemas, facilitando sua operação e manutenção. Além disso, Doris oferece suporte à consulta de tabelas de dados ES externas. Ele pode interagir facilmente com os metadados no ES e mapear automaticamente o esquema da tabela do ES para que possamos realizar consultas nos dados do Elasticsearch via Doris sem lidar com conexões complexas.
Além disso, o Doris oferece suporte a vários métodos de ingestão de dados, incluindo importação em lote de armazenamento remoto, como HDFS e S3, leitura de dados do MySQL binlog e Kafka e sincronização de dados em tempo real ou importação em lote do MySQL, Oracle e PostgreSQL. Ele garante a disponibilidade do serviço e a confiabilidade dos dados por meio de um protocolo de consistência e é capaz de depuração automática. Esta é uma ótima notícia para nossos operadores e mantenedores.
Estatisticamente falando, esses recursos reduziram nosso custo de armazenamento em 42% e o custo de desenvolvimento em 40%.
Durante nosso uso do Doris, recebemos muito suporte da comunidade Apache Doris de software livre e ajuda oportuna da equipe SelectDB, que agora está executando uma versão comercial do Apache Doris.
Falando dos conjuntos de dados, pelo lado positivo, nossos analistas de dados têm a liberdade de redefinir e combinar as tags e métricas conforme sua conveniência. Mas, no lado negativo, a alta heterogeneidade dos sistemas de tags e métricas leva a mais dificuldades em seu uso e gerenciamento.
Nossa solução é introduzir uma camada semântica em nosso pipeline de processamento de dados. A camada semântica é onde todos os termos técnicos são traduzidos em conceitos mais compreensíveis para nossos usuários de dados internos. Em outras palavras, estamos transformando as tags e métricas em cidadãos de primeira classe para definição e gerenciamento de dados.
Por que isso ajudaria?
Para analistas de dados, todas as tags e métricas serão criadas e compartilhadas na camada semântica para que haja menos confusão e maior eficiência.
Para usuários de dados, eles não precisam mais criar seus próprios conjuntos de dados ou descobrir qual deles é aplicável a cada cenário, mas podem simplesmente realizar consultas em seu conjunto de tags e métricas especificados.
Definir explicitamente as tags e métricas na camada semântica não era suficiente. Para construir um sistema de processamento de dados padronizado, nosso próximo objetivo era garantir uma definição consistente de tags e métricas em todo o pipeline de processamento de dados.
Por isso, tornamos a camada semântica o coração do nosso sistema de gerenciamento de dados:
Como funciona?
Todas as lógicas de computação em TDW serão definidas na camada semântica na forma de uma única tag ou métrica.
A camada semântica recebe consultas lógicas do lado do aplicativo, seleciona um mecanismo de acordo e gera SQL. Em seguida, ele envia o comando SQL ao TDW para execução. Enquanto isso, ele também pode enviar tarefas de configuração e ingestão de dados para Doris e decidir quais métricas e tags devem ser aceleradas.
Dessa forma, tornamos as tags e métricas mais gerenciáveis. Uma mosca na pomada é que, como cada tag e métrica é definida individualmente, estamos lutando para automatizar a geração de uma instrução SQL válida para as consultas. Se você tem alguma ideia sobre isso, você é mais do que bem-vindo para falar conosco.
Como você pode ver, o Apache Doris desempenhou um papel fundamental em nossa solução. A otimização do uso de Doris pode melhorar muito nossa eficiência geral de processamento de dados. Então, nesta parte, vamos compartilhar com você o que fazemos com Doris para acelerar a ingestão de dados e consultas e reduzir custos.
O que nós queremos?
Atualmente, temos mais de 800 tags e mais de 1.300 métricas derivadas de mais de 80 tabelas de origem no TDW. Ao importar dados de TDW para Doris, esperamos alcançar:
Disponibilidade em tempo real : além da tradicional ingestão de dados off-line T+1, exigimos marcação em tempo real.
Atualização parcial : Cada tabela de origem gera dados por meio de sua própria tarefa ETL em vários ritmos e envolve apenas parte das tags e métricas, portanto, exigimos o suporte para atualização parcial de colunas.
Alto desempenho : Precisamos de um tempo de resposta de apenas alguns segundos em cenários de segmentação, análise e geração de relatórios de grupo.
Baixos custos : Esperamos reduzir os custos o máximo possível.
Gerar tabelas planas no Flink em vez de TDW
Alto custo de armazenamento : o TDW precisa manter uma tabela extra plana além das mais de 80 tabelas de origem discretas. Isso é uma enorme redundância.
Tempo real baixo : Qualquer atraso nas tabelas de origem será aumentado e retardará todo o link de dados.
Alto custo de desenvolvimento : para atingir a pontualidade real, seriam necessários esforços e recursos extras de desenvolvimento.
Pelo contrário, gerar tabelas planas em Doris é muito mais fácil e barato. O processo é como se segue:
Isso pode reduzir bastante os custos de armazenamento, pois o TDW não precisa mais manter duas cópias de dados e o KafKa só precisa armazenar os novos dados pendentes para ingestão. Além do mais, podemos adicionar qualquer lógica ETL que quisermos ao Flink e reutilizar muita lógica de desenvolvimento para ingestão de dados offline e em tempo real.
Como mencionamos, o Modelo Agregado de Doris permite a atualização parcial das colunas. Aqui, fornecemos uma introdução simples a outros modelos de dados em Doris para sua referência:
Modelo exclusivo : aplicável a cenários que exigem exclusividade de chave primária. Ele mantém apenas os dados mais recentes do mesmo ID de chave primária. (Até onde sabemos, a comunidade Apache Doris também está planejando incluir atualizações parciais de colunas no Modelo Único.)
Modelo Duplicado : Este modelo armazena todos os dados originais exatamente como estão, sem qualquer pré-agregação ou desduplicação.
Depois de determinar o modelo de dados, tivemos que pensar em como nomear as colunas. Usar as tags ou métricas como nomes de coluna não foi uma escolha porque:
Ⅰ. Nossos usuários de dados internos podem precisar renomear as métricas ou tags, mas o Doris 1.1.3 não oferece suporte à modificação de nomes de colunas.
Ⅱ. As tags podem ser colocadas on-line e off-line com frequência. Se isso envolver a adição e remoção de colunas, isso não apenas consumirá tempo, mas também prejudicará o desempenho da consulta. Em vez disso, fazemos o seguinte:
Para renomeação flexível de tags e métricas, usamos tabelas MySQL para armazenar os metadados (nome, ID exclusivo globalmente, status, etc.). Qualquer alteração nos nomes acontecerá apenas nos metadados, mas não afetará o esquema da tabela em Doris. Por exemplo, se um song_name
receber um ID de 4, ele será armazenado com o nome de coluna a4 em Doris. Então, se o song_name
estiver envolvido em uma consulta, ele será convertido para a4 no SQL.
Para on-line e off-line de tags, classificamos as tags com base na frequência com que são usadas. Os menos usados receberão uma marca offline em seus metadados. Nenhum dado novo será colocado nas tags off-line, mas os dados existentes nessas tags ainda estarão disponíveis.
Para disponibilidade em tempo real de tags e métricas recém-adicionadas, pré-construímos algumas colunas de ID nas tabelas Doris com base no mapeamento de IDs de nome. Essas colunas de ID reservadas serão alocadas para as tags e métricas recém-adicionadas. Assim, podemos evitar a alteração do esquema da tabela e as consequentes sobrecargas. Nossa experiência mostra que apenas 10 minutos após as tags e métricas serem adicionadas, os dados sob elas podem estar disponíveis.
Digno de nota, o recém-lançado Doris 1.2.0 suporta Light Schema Change, o que significa que para adicionar ou remover colunas, você só precisa modificar os metadados no FE. Além disso, você pode renomear as colunas nas tabelas de dados, desde que tenha ativado Light Schema Change para as tabelas. Esta é uma grande economia de problemas para nós.
Aqui estão algumas práticas que reduziram nosso tempo diário de ingestão de dados off-line em 75% e nossa pontuação de compactação CUMU de 600+ para 100.
Pré-agregação Flink: como mencionado acima.
Dimensionamento automático do lote de gravação: para reduzir o uso de recursos do Flink, permitimos que os dados em um tópico Kafka sejam gravados em várias tabelas Doris e realizamos a alteração automática do tamanho do lote com base na quantidade de dados.
Otimização da gravação de dados Doris: ajuste fino dos tamanhos de comprimidos e baldes, bem como os parâmetros de compactação para cada cenário:
max_XXXX_compaction_thread max_cumulative_compaction_num_singleton_deltas
Otimização da lógica de confirmação de BE: conduza cache regular de listas de BE, confirme-as nos nós de BE lote por lote e use granularidade de balanceamento de carga mais fina.
Use Dori-on-ES em consultas
Cerca de 60% de nossas consultas de dados envolvem segmentação de grupo. A segmentação de grupo é encontrar nossos dados de destino usando um conjunto de tags como filtros. Ele apresenta alguns requisitos para nossa arquitetura de processamento de dados:
A segmentação de grupo relacionada a usuários de APP pode envolver uma lógica muito complicada. Isso significa que o sistema deve suportar centenas de tags como filtros simultaneamente.
A maioria dos cenários de segmentação de grupo requer apenas os dados de tag mais recentes. No entanto, as consultas de métrica precisam oferecer suporte a dados históricos.
Os usuários de dados podem precisar executar análises agregadas adicionais de dados de métrica após a segmentação de grupo.
Os usuários de dados também podem precisar realizar consultas detalhadas sobre tags e métricas após a segmentação do grupo.
Após consideração, decidimos adotar Doris-on-ES. Doris é onde armazenamos os dados de métrica para cada cenário como uma tabela de partição, enquanto o Elasticsearch armazena todos os dados de tag. A solução Doris-on-ES combina o recurso de planejamento de consultas distribuídas do Doris e o recurso de pesquisa de texto completo do Elasticsearch. O padrão de consulta é o seguinte:
SELECT tag, agg(metric) FROM Doris WHERE id in (select id from Es where tagFilter) GROUP BY tag
Conforme mostrado, os dados de ID localizados no Elasticsearch serão usados na subconsulta no Doris para análise de métricas. Na prática, descobrimos que o tempo de resposta da consulta está relacionado ao tamanho do grupo-alvo. Se o grupo de destino contiver mais de um milhão de objetos, a consulta levará até 60 segundos. Se for ainda maior, pode ocorrer um erro de tempo limite. Após investigação, identificamos nossas duas maiores perdas de tempo:
I. Quando Doris BE extrai dados do Elasticsearch (1.024 linhas por vez, por padrão), para um grupo-alvo de mais de um milhão de objetos, a sobrecarga de E/S da rede pode ser enorme.
II. Após a extração de dados, Doris BE precisa realizar operações de junção com tabelas de métricas locais via SHUFFLE/BROADCAST, o que pode custar muito.
Assim, fazemos as seguintes otimizações:
Adicione uma variável de sessão de consulta es_optimize
que especifica se a otimização deve ser habilitada.
Na gravação de dados no ES, adicione uma coluna BK para armazenar o número do bucket após o hash do ID da chave primária. O algoritmo é o mesmo que o algoritmo de bucketing em Doris (CRC32).
Use Doris BE para gerar um plano de execução Bucket Join, despache o número do balde para BE ScanNode e envie-o para ES.
Use ES para compactar os dados consultados; transforme várias buscas de dados em uma e reduza a sobrecarga de E/S da rede.
Certifique-se de que Doris BE extraia apenas os dados de baldes relacionados às tabelas de métricas locais e conduza operações locais de junção diretamente para evitar embaralhamento de dados entre Doris BEs.
Como resultado, reduzimos o tempo de resposta da consulta para segmentação de grandes grupos de 60 segundos para surpreendentes 3,7 segundos. As informações da comunidade mostram que Doris oferecerá suporte à indexação invertida desde a versão 2.0.0, que será lançada em breve. Com esta nova versão, poderemos realizar pesquisa de texto completo em tipos de texto, filtragem de equivalência ou intervalo de textos, números e data e hora e combinar convenientemente a lógica AND, OR, NOT na filtragem, pois a indexação invertida suporta tipos de matriz. Espera-se que esse novo recurso do Doris forneça um desempenho 3 a 5 vezes melhor do que o Elasticsearch na mesma tarefa.
A capacidade da Doris de separação de dados frios e quentes fornece a base de nossas estratégias de redução de custos no processamento de dados.
Com base no mecanismo TTL de Doris, armazenamos apenas os dados do ano atual em Doris e colocamos os dados históricos anteriores em TDW para reduzir o custo de armazenamento.
Variamos o número de cópias para diferentes partições de dados. Por exemplo, definimos três cópias para dados dos últimos três meses, que são usados com frequência, uma cópia para dados com mais de seis meses e duas cópias para dados intermediários.
Doris oferece suporte para transformar dados quentes em dados frios, portanto, armazenamos apenas os dados dos últimos sete dias no SSD e transferimos os dados mais antigos para o HDD para armazenamento mais barato.
Obrigado por rolar até aqui e terminar esta longa leitura. Compartilhamos nossas alegrias e lágrimas, lições aprendidas e algumas práticas que podem ser de algum valor para você durante nossa transição de ClickHouse para Doris. Agradecemos muito a ajuda da comunidade Apache Doris e da equipe SelectDB, mas ainda podemos persegui-los por um tempo, pois tentamos realizar a identificação automática de dados frios e quentes, pré-computação de tags/métricas usadas com frequência, simplificação da lógica do código usando visualizações materializadas e assim por diante.