Este artículo está coescrito por mí y mi colega Kai Dai. Ambos somos ingenieros de plataforma de datos en Tencent Music (NYSE: TME), un proveedor de servicios de transmisión de música con la friolera de 800 millones de usuarios activos mensuales. Dejar caer el número aquí no es presumir, sino dar una pista del mar de datos con el que mis pobres compañeros de trabajo y yo tenemos que lidiar todos los días.
La biblioteca de música de Tencent Music contiene datos de todas las formas y tipos: música grabada, música en vivo, audios, videos, etc. Como ingenieros de plataformas de datos, nuestro trabajo es destilar información de los datos, en base a la cual nuestros compañeros de equipo pueden tomar mejores decisiones. para apoyar a nuestros usuarios y socios musicales.
Específicamente, hacemos un análisis completo de las canciones, letras, melodías, álbumes y artistas, convertimos toda esta información en activos de datos y los pasamos a nuestros usuarios de datos internos para el recuento de inventario, perfiles de usuarios, análisis de métricas y segmentación de grupos. .
Almacenamos y procesamos la mayoría de nuestros datos en Tencent Data Warehouse (TDW), una plataforma de datos fuera de línea donde colocamos los datos en varios sistemas métricos y de etiquetas y luego creamos tablas planas que centran cada objeto (canciones, artistas, etc.).
Luego, importamos las tablas planas a ClickHouse para el análisis y Elasticsearch para la búsqueda de datos y la orientación de grupos.
Después de eso, nuestros analistas de datos usaron los datos bajo las etiquetas y métricas que necesitaban para formar conjuntos de datos para diferentes escenarios de uso, durante los cuales podían crear sus propias etiquetas y métricas.
La tubería de procesamiento de datos se veía así:
Al trabajar con la canalización anterior, encontramos algunas dificultades:
Actualización parcial : no se admitió la actualización parcial de columnas. Por lo tanto, cualquier latencia de cualquiera de las fuentes de datos podría retrasar la creación de tablas planas y, por lo tanto, socavar la puntualidad de los datos.
Alto costo de almacenamiento : los datos bajo diferentes etiquetas y métricas se actualizaron con diferentes frecuencias. Aunque ClickHouse sobresalió en el manejo de tablas planas, fue una gran pérdida de recursos de almacenamiento simplemente verter todos los datos en una tabla plana y particionarlos por día, sin mencionar el costo de mantenimiento que conlleva.
Alto costo de mantenimiento : desde el punto de vista arquitectónico, ClickHouse se caracterizó por el fuerte acoplamiento de los nodos de almacenamiento y los nodos de cómputo. Sus componentes eran muy interdependientes, lo que se sumaba a los riesgos de inestabilidad del clúster. Además, para las consultas federadas en ClickHouse y Elasticsearch, tuvimos que solucionar una gran cantidad de problemas de conexión. Eso fue simplemente tedioso.
Apache Doris , una base de datos analítica en tiempo real, cuenta con algunas características que son exactamente lo que necesitábamos para resolver nuestros problemas:
Actualización parcial : Doris admite una amplia variedad de modelos de datos, entre los cuales el modelo agregado admite la actualización parcial de columnas en tiempo real. Sobre la base de esto, podemos ingerir datos sin procesar directamente en Doris y crear tablas planas allí. La ingestión es así: en primer lugar, usamos Spark para cargar datos en Kafka; luego, cualquier dato incremental se actualizará a Doris y Elasticsearch a través de Flink. Mientras tanto, Flink agregará previamente los datos para liberar la carga de Doris y Elasticsearch.
Costo de almacenamiento : Doris admite consultas de combinación de varias tablas y consultas federadas en Hive, Iceberg, Hudi, MySQL y Elasticsearch. Esto nos permite dividir las tablas planas grandes en otras más pequeñas y dividirlas por frecuencia de actualización. Los beneficios de hacerlo incluyen un alivio de la carga de almacenamiento y un aumento del rendimiento de consultas.
Costo de mantenimiento : Doris es de arquitectura simple y es compatible con el protocolo MySQL. La implementación de Doris solo involucra dos procesos (FE y BE) sin dependencia de otros sistemas, lo que facilita su operación y mantenimiento. Además, Doris admite la consulta de tablas de datos ES externas. Puede interactuar fácilmente con los metadatos en ES y mapear automáticamente el esquema de la tabla de ES para que podamos realizar consultas en los datos de Elasticsearch a través de Doris sin tener que lidiar con conexiones complejas.
Además, Doris admite varios métodos de ingesta de datos, incluida la importación por lotes desde almacenamiento remoto como HDFS y S3, lecturas de datos de MySQL binlog y Kafka, y sincronización de datos en tiempo real o importación por lotes desde MySQL, Oracle y PostgreSQL. Garantiza la disponibilidad del servicio y la confiabilidad de los datos a través de un protocolo de consistencia y es capaz de depurar automáticamente. Esta es una gran noticia para nuestros operadores y mantenedores.
Estadísticamente hablando, estas características han reducido nuestro costo de almacenamiento en un 42 % y el costo de desarrollo en un 40 %.
Durante nuestro uso de Doris, recibimos mucho apoyo de la comunidad Apache Doris de código abierto y ayuda oportuna del equipo de SelectDB, que ahora ejecuta una versión comercial de Apache Doris.
Hablando de los conjuntos de datos, en el lado positivo, nuestros analistas de datos tienen la libertad de redefinir y combinar las etiquetas y las métricas a su conveniencia. Pero en el lado oscuro, la alta heterogeneidad de los sistemas de etiquetas y métricas conduce a una mayor dificultad en su uso y gestión.
Nuestra solución es introducir una capa semántica en nuestra canalización de procesamiento de datos. La capa semántica es donde todos los términos técnicos se traducen en conceptos más comprensibles para nuestros usuarios de datos internos. En otras palabras, estamos convirtiendo las etiquetas y métricas en ciudadanos de primera clase para la definición y gestión de datos.
¿Por qué ayudaría esto?
Para los analistas de datos, todas las etiquetas y métricas se crearán y compartirán en la capa semántica para que haya menos confusión y mayor eficiencia.
Para los usuarios de datos, ya no necesitan crear sus propios conjuntos de datos o averiguar cuál es aplicable para cada escenario, sino que simplemente pueden realizar consultas en su conjunto de etiquetas y métricas especificado.
Definir explícitamente las etiquetas y las métricas en la capa semántica no fue suficiente. Para construir un sistema de procesamiento de datos estandarizado, nuestro próximo objetivo era garantizar una definición consistente de etiquetas y métricas a lo largo de todo el proceso de procesamiento de datos.
Por este motivo, hicimos de la capa semántica el corazón de nuestro sistema de gestión de datos:
¿Como funciona?
Todas las lógicas de computación en TDW se definirán en la capa semántica en forma de una sola etiqueta o métrica.
La capa semántica recibe consultas lógicas desde el lado de la aplicación, selecciona un motor en consecuencia y genera SQL. Luego envía el comando SQL a TDW para su ejecución. Mientras tanto, también podría enviar tareas de ingesta de datos y configuración a Doris y decidir qué métricas y etiquetas deben acelerarse.
De esta forma, hemos hecho que las etiquetas y las métricas sean más manejables. Una mosca en el ungüento es que, dado que cada etiqueta y métrica se define individualmente, estamos luchando para automatizar la generación de una declaración SQL válida para las consultas. Si tienes alguna idea sobre esto, eres más que bienvenido a hablar con nosotros.
Como puede ver, Apache Doris ha jugado un papel fundamental en nuestra solución. Optimizar el uso de Doris puede mejorar en gran medida nuestra eficiencia general de procesamiento de datos. Entonces, en esta parte, vamos a compartir con ustedes lo que hacemos con Doris para acelerar la ingesta de datos y las consultas y reducir los costos.
¿Lo que nosotros queremos?
Actualmente, tenemos más de 800 etiquetas y más de 1300 métricas derivadas de las más de 80 tablas de origen en TDW. Al importar datos de TDW a Doris, esperamos lograr:
Disponibilidad en tiempo real : además de la ingesta de datos fuera de línea T+1 tradicional, requerimos etiquetado en tiempo real.
Actualización parcial : cada tabla de origen genera datos a través de su propia tarea ETL a varios ritmos e involucra solo una parte de las etiquetas y métricas, por lo que necesitamos soporte para la actualización parcial de columnas.
Alto rendimiento : Necesitamos un tiempo de respuesta de solo unos segundos en escenarios de segmentación, análisis e informes de grupos.
Costes bajos : Esperamos reducir los costes lo máximo posible.
Genere tablas planas en Flink en lugar de TDW
Alto costo de almacenamiento : TDW tiene que mantener una tabla extraplana además de las más de 80 tablas de origen discretas. Esa es una gran redundancia.
Baja puntualidad real : cualquier retraso en las tablas de origen aumentará y retrasará todo el enlace de datos.
Alto costo de desarrollo : para lograr la puntualidad real se requieren esfuerzos y recursos de desarrollo adicionales.
Por el contrario, generar tablas planas en Doris es mucho más fácil y económico. El proceso es el siguiente:
Esto puede reducir en gran medida los costos de almacenamiento, ya que TDW ya no tiene que mantener dos copias de datos y KafKa solo necesita almacenar los nuevos datos pendientes de ingesta. Además, podemos agregar cualquier lógica ETL que queramos en Flink y reutilizar mucha lógica de desarrollo para la ingesta de datos fuera de línea y en tiempo real.
Como mencionamos, el Modelo Agregado de Doris permite la actualización parcial de columnas. Aquí proporcionamos una introducción simple a otros modelos de datos en Doris para su referencia:
Modelo único : esto es aplicable para escenarios que requieren unicidad de clave principal. Solo conserva los datos más recientes de la misma ID de clave principal. (Hasta donde sabemos, la comunidad de Apache Doris también planea incluir una actualización parcial de las columnas en el modelo único).
Modelo duplicado : este modelo almacena todos los datos originales exactamente como están, sin agregación previa ni deduplicación.
Después de determinar el modelo de datos, tuvimos que pensar en cómo nombrar las columnas. Usar etiquetas o métricas como nombres de columna no fue una opción porque:
Ⅰ. Es posible que nuestros usuarios de datos internos necesiten cambiar el nombre de las métricas o las etiquetas, pero Doris 1.1.3 no admite la modificación de los nombres de las columnas.
Ⅱ. Las etiquetas se pueden tomar en línea y fuera de línea con frecuencia. Si eso implica agregar y eliminar columnas, no solo consumirá mucho tiempo sino que también será perjudicial para el rendimiento de las consultas. En su lugar, hacemos lo siguiente:
Para cambiar el nombre de etiquetas y métricas de forma flexible, utilizamos tablas de MySQL para almacenar los metadatos (nombre, ID único global, estado, etc.). Cualquier cambio en los nombres solo ocurrirá en los metadatos, pero no afectará el esquema de la tabla en Doris. Por ejemplo, si a un song_name
se le asigna un ID de 4, se almacenará con el nombre de columna a4 en Doris. Luego, si el song_name
está involucrado en una consulta, se convertirá a a4 en SQL.
Para las etiquetas en línea y fuera de línea, clasificamos las etiquetas en función de la frecuencia con la que se utilizan. Los menos utilizados recibirán una marca de fuera de línea en sus metadatos. No se colocarán nuevos datos bajo las etiquetas sin conexión, pero los datos existentes bajo esas etiquetas seguirán estando disponibles.
Para la disponibilidad en tiempo real de las etiquetas y métricas recién agregadas, creamos previamente algunas columnas de ID en las tablas de Doris en función de la asignación de ID de nombre. Estas columnas de ID reservadas se asignarán a las etiquetas y métricas recién agregadas. Así, podemos evitar el cambio de esquema de tablas y los consiguientes gastos generales. Nuestra experiencia muestra que solo 10 minutos después de agregar las etiquetas y las métricas, los datos debajo de ellas pueden estar disponibles.
Cabe destacar que el Doris 1.2.0 lanzado recientemente es compatible con Light Schema Change, lo que significa que para agregar o eliminar columnas, solo necesita modificar los metadatos en FE. Además, puede cambiar el nombre de las columnas en las tablas de datos siempre que haya habilitado Light Schema Change para las tablas. Este es un gran ahorro de problemas para nosotros.
Aquí hay algunas prácticas que han reducido nuestro tiempo diario de ingesta de datos fuera de línea en un 75 % y nuestra puntuación de compactación CUMU de 600+ a 100.
Agregación previa de Flink: como se mencionó anteriormente.
Tamaño automático del lote de escritura: para reducir el uso de recursos de Flink, permitimos que los datos en un tema de Kafka se escriban en varias tablas de Doris y realizamos la alteración automática del tamaño del lote en función de la cantidad de datos.
Optimización de la escritura de datos de Doris: ajuste los tamaños de las tabletas y los cubos, así como los parámetros de compactación para cada escenario:
max_XXXX_compaction_thread max_cumulative_compaction_num_singleton_deltas
Optimización de la lógica de compromiso de BE: lleve a cabo un almacenamiento en caché regular de las listas de BE, confírmelas a los nodos de BE lote por lote y use una granularidad de equilibrio de carga más fina.
Usar Dori-on-ES en consultas
Alrededor del 60% de nuestras consultas de datos implican la orientación de grupos. La orientación grupal consiste en encontrar nuestros datos objetivo mediante el uso de un conjunto de etiquetas como filtros. Plantea algunos requisitos para nuestra arquitectura de procesamiento de datos:
La segmentación de grupos relacionada con los usuarios de la aplicación puede implicar una lógica muy complicada. Eso significa que el sistema debe admitir cientos de etiquetas como filtros simultáneamente.
La mayoría de los escenarios de segmentación de grupos solo requieren los datos de etiquetas más recientes. Sin embargo, las consultas de métricas deben admitir datos históricos.
Es posible que los usuarios de datos deban realizar más análisis agregados de datos métricos después de la segmentación por grupos.
Es posible que los usuarios de datos también deban realizar consultas detalladas sobre etiquetas y métricas después de la segmentación por grupos.
Después de considerarlo, decidimos adoptar Doris-on-ES. Doris es donde almacenamos los datos de métricas para cada escenario como una tabla de partición, mientras que Elasticsearch almacena todos los datos de las etiquetas. La solución Doris-on-ES combina la capacidad de planificación de consultas distribuidas de Doris y la capacidad de búsqueda de texto completo de Elasticsearch. El patrón de consulta es el siguiente:
SELECT tag, agg(metric) FROM Doris WHERE id in (select id from Es where tagFilter) GROUP BY tag
Como se muestra, los datos de ID ubicados en Elasticsearch se usarán en la subconsulta en Doris para el análisis de métricas. En la práctica, encontramos que el tiempo de respuesta de la consulta está relacionado con el tamaño del grupo objetivo. Si el grupo objetivo contiene más de un millón de objetos, la consulta tardará hasta 60 segundos. Si es aún mayor, podría producirse un error de tiempo de espera. Después de la investigación, identificamos nuestras dos mayores pérdidas de tiempo:
I. Cuando Doris BE extrae datos de Elasticsearch (1024 líneas a la vez de manera predeterminada), para un grupo objetivo de más de un millón de objetos, la sobrecarga de E/S de la red puede ser enorme.
II. Después de extraer los datos, Doris BE necesita realizar operaciones de unión con tablas de métricas locales a través de SHUFFLE/BROADCAST, lo que puede costar mucho.
Por lo tanto, hacemos las siguientes optimizaciones:
Agregue una variable de sesión de consulta es_optimize
que especifique si habilitar la optimización.
En la escritura de datos en ES, agregue una columna BK para almacenar el número de depósito después de que se haya generado el hash de la ID de la clave principal. El algoritmo es el mismo que el algoritmo de depósito en Doris (CRC32).
Utilice Doris BE para generar un plan de ejecución de combinación de depósitos, envíe el número de depósito a BE ScanNode y empújelo a ES.
Use ES para comprimir los datos consultados; Convierta la obtención de datos múltiples en una sola y reduzca la sobrecarga de E/S de la red.
Asegúrese de que Doris BE solo extraiga los datos de los cubos relacionados con las tablas de métricas locales y realice operaciones de unión locales directamente para evitar la mezcla de datos entre Doris BE.
Como resultado, reducimos el tiempo de respuesta de consulta para la segmentación de grupos grandes de 60 segundos a unos sorprendentes 3,7 segundos. La información de la comunidad muestra que Doris admitirá la indexación invertida desde la versión 2.0.0, que se lanzará pronto. Con esta nueva versión, podremos realizar búsquedas de texto completo en tipos de texto, equivalencia o filtrado de rango de textos, números y fecha y hora, y combinar convenientemente la lógica AND, OR, NOT en el filtrado, ya que la indexación invertida admite tipos de matriz. Se espera que esta nueva función de Doris ofrezca un rendimiento entre 3 y 5 veces superior al de Elasticsearch en la misma tarea.
La capacidad de Doris de separación de datos fríos y calientes proporciona la base de nuestras estrategias de reducción de costos en el procesamiento de datos.
Basado en el mecanismo TTL de Doris, solo almacenamos datos del año actual en Doris y ponemos los datos históricos antes que en TDW para reducir el costo de almacenamiento.
Variamos el número de copias para diferentes particiones de datos. Por ejemplo, configuramos tres copias para los datos de los últimos tres meses, que se usan con frecuencia, una copia para los datos de más de seis meses y dos copias para los datos intermedios.
Doris admite la conversión de datos calientes en datos fríos, por lo que solo almacenamos datos de los últimos siete días en SSD y transferimos datos más antiguos a HDD para un almacenamiento menos costoso.
Gracias por recorrer todo el camino hasta aquí y terminar esta larga lectura. Compartimos nuestros vítores y lágrimas, las lecciones aprendidas y algunas prácticas que pueden ser de algún valor para usted durante nuestra transición de ClickHouse a Doris. Realmente apreciamos la ayuda de la comunidad de Apache Doris y el equipo de SelectDB, pero es posible que aún los persigamos por un tiempo, ya que intentamos realizar la identificación automática de datos fríos y calientes, el cálculo previo de etiquetas/métricas de uso frecuente, simplificación de la lógica del código usando vistas materializadas, y así sucesivamente.