Dieser Artikel wurde von mir und meinem Kollegen Kai Dai gemeinsam verfasst. Wir sind beide Datenplattform-Ingenieure bei Tencent Music (NYSE: TME), einem Musik-Streaming-Dienstleister mit satten 800 Millionen aktiven Nutzern pro Monat. Die Angabe der Zahl hier ist keine Prahlerei, sondern ein Hinweis auf die Datenflut, mit der meine armen Kollegen und ich jeden Tag zu tun haben.
Die Musikbibliothek von Tencent Music enthält Daten aller Formen und Typen: aufgenommene Musik, Live-Musik, Audios, Videos usw. Als Datenplattform-Ingenieure besteht unsere Aufgabe darin, Informationen aus den Daten zu destillieren, auf deren Grundlage unsere Teamkollegen bessere Entscheidungen treffen können zur Unterstützung unserer Nutzer und Musikpartner.
Insbesondere führen wir eine umfassende Analyse der Lieder, Liedtexte, Melodien, Alben und Künstler durch, wandeln all diese Informationen in Datenbestände um und geben sie an unsere internen Datennutzer zur Bestandszählung, Benutzerprofilierung, Metrikanalyse und Gruppenausrichtung weiter .
Wir haben die meisten unserer Daten im Tencent Data Warehouse (TDW) gespeichert und verarbeitet, einer Offline-Datenplattform, auf der wir die Daten in verschiedene Tag- und Metriksysteme eingefügt und dann flache Tabellen erstellt haben, in denen jedes Objekt (Lieder, Künstler usw.) zentriert ist.
Anschließend importierten wir die flachen Tabellen zur Analyse in ClickHouse und zur Datensuche und zum Gruppen-Targeting in Elasticsearch.
Anschließend verwendeten unsere Datenanalysten die Daten unter den von ihnen benötigten Tags und Metriken, um Datensätze für verschiedene Nutzungsszenarien zu erstellen, in denen sie ihre eigenen Tags und Metriken erstellen konnten.
Die Datenverarbeitungspipeline sah folgendermaßen aus:
Bei der Arbeit mit der oben genannten Pipeline sind wir auf einige Schwierigkeiten gestoßen:
Teilweise Aktualisierung : Eine teilweise Aktualisierung von Spalten wurde nicht unterstützt. Daher könnte jede Latenzzeit von einer der Datenquellen die Erstellung flacher Tabellen verzögern und somit die Aktualität der Daten beeinträchtigen.
Hohe Speicherkosten : Daten unter verschiedenen Tags und Metriken wurden unterschiedlich häufig aktualisiert. So sehr sich ClickHouse im Umgang mit flachen Tabellen hervorgetan hat, war es eine enorme Verschwendung von Speicherressourcen, einfach alle Daten in eine flache Tabelle zu packen und sie nach Tagen zu partitionieren, ganz zu schweigen von den damit verbundenen Wartungskosten.
Hohe Wartungskosten : Aus architektonischer Sicht zeichnete sich ClickHouse durch die starke Kopplung von Speicherknoten und Rechenknoten aus. Seine Komponenten waren stark voneinander abhängig, was das Risiko einer Cluster-Instabilität erhöhte. Außerdem mussten wir uns bei föderierten Abfragen über ClickHouse und Elasticsearch hinweg um eine Vielzahl von Verbindungsproblemen kümmern. Das war einfach langweilig.
Apache Doris , eine Echtzeit-Analysedatenbank, verfügt über einige Funktionen, die genau das sind, was wir zur Lösung unserer Probleme brauchten:
Teilweise Aktualisierung : Doris unterstützt eine Vielzahl von Datenmodellen, darunter das Aggregate Model, das die teilweise Aktualisierung von Spalten in Echtzeit unterstützt. Darauf aufbauend können wir Rohdaten direkt in Doris einspeisen und dort flache Tabellen erstellen. Die Aufnahme läuft folgendermaßen ab: Zuerst verwenden wir Spark, um Daten in Kafka zu laden; Anschließend werden alle inkrementellen Daten über Flink auf Doris und Elasticsearch aktualisiert. In der Zwischenzeit wird Flink die Daten vorab aggregieren, um Doris und Elasticsearch zu entlasten.
Speicherkosten : Doris unterstützt Multi-Table-Join-Abfragen und Verbundabfragen über Hive, Iceberg, Hudi, MySQL und Elasticsearch. Dadurch können wir die großen flachen Tabellen in kleinere aufteilen und sie nach Aktualisierungshäufigkeit unterteilen. Zu den Vorteilen dieser Vorgehensweise gehören eine Entlastung des Speichers und eine Erhöhung des Abfragedurchsatzes.
Wartungskosten : Doris hat eine einfache Architektur und ist mit dem MySQL-Protokoll kompatibel. Die Bereitstellung von Doris umfasst nur zwei Prozesse (FE und BE) ohne Abhängigkeit von anderen Systemen, was die Bedienung und Wartung erleichtert. Außerdem unterstützt Doris die Abfrage externer ES-Datentabellen. Es kann problemlos mit den Metadaten in ES verbunden werden und das Tabellenschema von ES automatisch zuordnen, sodass wir Abfragen zu Elasticsearch-Daten über Doris durchführen können, ohne uns mit komplexen Verbindungen herumschlagen zu müssen.
Darüber hinaus unterstützt Doris mehrere Datenerfassungsmethoden, einschließlich Batch-Import aus Remote-Speichern wie HDFS und S3, Datenlesen aus MySQL Binlog und Kafka sowie Echtzeit-Datensynchronisierung oder Batch-Import von MySQL, Oracle und PostgreSQL. Es gewährleistet Dienstverfügbarkeit und Datenzuverlässigkeit durch ein Konsistenzprotokoll und ist in der Lage, automatisches Debugging durchzuführen. Das sind großartige Neuigkeiten für unsere Betreiber und Instandhalter.
Statistisch gesehen haben diese Funktionen unsere Speicherkosten um 42 % und die Entwicklungskosten um 40 % gesenkt.
Während unserer Nutzung von Doris haben wir viel Unterstützung von der Open-Source-Apache-Doris-Community und zeitnahe Hilfe vom SelectDB-Team erhalten, das jetzt eine kommerzielle Version von Apache Doris betreibt.
Was die Datensätze betrifft: Positiv ist, dass unseren Datenanalysten die Freiheit eingeräumt wird, die Tags und Metriken nach Belieben neu zu definieren und zu kombinieren. Auf der anderen Seite führt die hohe Heterogenität der Tag- und Metriksysteme jedoch zu größeren Schwierigkeiten bei deren Verwendung und Verwaltung.
Unsere Lösung besteht darin, eine semantische Ebene in unsere Datenverarbeitungspipeline einzuführen. Auf der semantischen Ebene werden alle Fachbegriffe in verständlichere Konzepte für unsere internen Datennutzer übersetzt. Mit anderen Worten: Wir verwandeln die Tags und Metriken in erstklassige Bürger für die Datendefinition und -verwaltung.
Warum sollte das helfen?
Für Datenanalysten werden alle Tags und Metriken auf der semantischen Ebene erstellt und geteilt, sodass es weniger Verwirrung und eine höhere Effizienz gibt.
Datennutzer müssen nicht mehr ihre eigenen Datensätze erstellen oder herausfinden, welcher für das jeweilige Szenario anwendbar ist, sondern können einfach Abfragen für ihr angegebenes Tag-Set und Metrik-Set durchführen.
Es reichte nicht aus, die Tags und Metriken explizit auf der semantischen Ebene zu definieren. Um ein standardisiertes Datenverarbeitungssystem aufzubauen, bestand unser nächstes Ziel darin, eine konsistente Definition von Tags und Metriken in der gesamten Datenverarbeitungspipeline sicherzustellen.
Aus diesem Grund haben wir die semantische Schicht zum Herzstück unseres Datenverwaltungssystems gemacht:
Wie funktioniert es?
Alle Rechenlogiken in TDW werden auf der semantischen Ebene in Form eines einzelnen Tags oder einer einzelnen Metrik definiert.
Die semantische Schicht empfängt logische Abfragen von der Anwendungsseite, wählt entsprechend eine Engine aus und generiert SQL. Anschließend wird der SQL-Befehl zur Ausführung an TDW gesendet. In der Zwischenzeit werden möglicherweise auch Konfigurations- und Datenerfassungsaufgaben an Doris gesendet und entschieden, welche Metriken und Tags beschleunigt werden sollen.
Auf diese Weise haben wir die Tags und Metriken übersichtlicher gemacht. Ein Wermutstropfen ist, dass wir Schwierigkeiten haben, die Generierung einer gültigen SQL-Anweisung für die Abfragen zu automatisieren, da jedes Tag und jede Metrik individuell definiert ist. Wenn Sie dazu eine Idee haben, können Sie gerne mit uns sprechen.
Wie Sie sehen, hat Apache Doris eine entscheidende Rolle bei unserer Lösung gespielt. Die Optimierung der Nutzung von Doris kann unsere Gesamteffizienz der Datenverarbeitung erheblich verbessern. In diesem Teil teilen wir Ihnen mit, was wir mit Doris tun, um die Datenaufnahme und -abfrage zu beschleunigen und die Kosten zu senken.
Was wir wollen?
Derzeit verfügen wir über mehr als 800 Tags und mehr als 1300 Metriken, die aus den mehr als 80 Quelltabellen in TDW abgeleitet sind. Durch den Datenimport von TDW nach Doris hoffen wir Folgendes zu erreichen:
Echtzeitverfügbarkeit : Zusätzlich zur herkömmlichen T+1-Offline-Datenerfassung benötigen wir Echtzeit-Tagging.
Teilweise Aktualisierung : Jede Quelltabelle generiert Daten über ihre eigene ETL-Aufgabe in unterschiedlichem Tempo und umfasst nur einen Teil der Tags und Metriken. Daher benötigen wir die Unterstützung für die teilweise Aktualisierung von Spalten.
Hohe Leistung : Wir benötigen eine Reaktionszeit von nur wenigen Sekunden in Gruppen-Targeting-, Analyse- und Berichtsszenarien.
Niedrige Kosten : Wir hoffen, die Kosten so weit wie möglich zu senken.
Generieren Sie flache Tabellen in Flink statt in TDW
Hohe Speicherkosten : TDW muss neben den über 80 diskreten Quelltabellen eine besonders flache Tabelle verwalten. Das ist eine enorme Redundanz.
Geringe Echtzeitgenauigkeit : Jede Verzögerung in den Quelltabellen wird verstärkt und verzögert die gesamte Datenverbindung.
Hohe Entwicklungskosten : Um Echtzeitgenauigkeit zu erreichen, wären zusätzliche Entwicklungsaufwände und Ressourcen erforderlich.
Im Gegenteil, die Generierung flacher Tabellen in Doris ist viel einfacher und kostengünstiger. Der Prozess ist wie folgt:
Dadurch können die Speicherkosten erheblich gesenkt werden, da TDW nicht mehr zwei Kopien der Daten verwalten muss und KafKa nur die neuen Daten bis zur Aufnahme speichern muss. Darüber hinaus können wir jede gewünschte ETL-Logik in Flink hinzufügen und viele Entwicklungslogiken für die Offline- und Echtzeit-Datenerfassung wiederverwenden.
Wie bereits erwähnt, ermöglicht das Aggregatmodell von Doris eine teilweise Aktualisierung von Spalten. Hier bieten wir Ihnen als Referenz eine einfache Einführung in andere Datenmodelle in Doris:
Einzigartiges Modell : Dies gilt für Szenarien, die die Eindeutigkeit des Primärschlüssels erfordern. Es werden nur die neuesten Daten derselben Primärschlüssel-ID gespeichert. (Soweit wir wissen, plant die Apache Doris-Community, auch eine teilweise Aktualisierung der Spalten in das Unique Model aufzunehmen.)
Duplikatmodell : Dieses Modell speichert alle Originaldaten genau so, wie sie sind, ohne Vorabaggregation oder Deduplizierung.
Nachdem wir das Datenmodell festgelegt hatten, mussten wir darüber nachdenken, wie wir die Spalten benennen. Die Verwendung der Tags oder Metriken als Spaltennamen war aus folgenden Gründen keine Option:
Ⅰ. Unsere internen Datenbenutzer müssen möglicherweise die Metriken oder Tags umbenennen, Doris 1.1.3 unterstützt jedoch keine Änderung von Spaltennamen.
Ⅱ. Tags können häufig online und offline verwendet werden. Wenn dazu das Hinzufügen und Löschen von Spalten erforderlich ist, ist dies nicht nur zeitaufwändig, sondern beeinträchtigt auch die Abfrageleistung. Stattdessen machen wir Folgendes:
Zur flexiblen Umbenennung von Tags und Metriken nutzen wir MySQL-Tabellen zur Speicherung der Metadaten (Name, global eindeutige ID, Status etc.). Jede Änderung der Namen erfolgt nur in den Metadaten, hat jedoch keine Auswirkungen auf das Tabellenschema in Doris. Wenn beispielsweise ein song_name
die ID 4 erhält, wird er mit dem Spaltennamen a4 in Doris gespeichert. Wenn dann der song_name
an einer Abfrage beteiligt ist, wird er in SQL in a4 konvertiert.
Für das Online- und Offline-Schalten von Tags sortieren wir die Tags danach, wie häufig sie verwendet werden. Die am wenigsten genutzten werden in ihren Metadaten mit einer Offline-Markierung versehen. Unter den Offline-Tags werden keine neuen Daten abgelegt, die vorhandenen Daten unter diesen Tags sind jedoch weiterhin verfügbar.
Für die Verfügbarkeit neu hinzugefügter Tags und Metriken in Echtzeit erstellen wir einige ID-Spalten in Doris-Tabellen basierend auf der Zuordnung von Namens-IDs. Diese reservierten ID-Spalten werden den neu hinzugefügten Tags und Metriken zugewiesen. Auf diese Weise können wir Tabellenschemaänderungen und den daraus resultierenden Mehraufwand vermeiden. Unsere Erfahrung zeigt, dass bereits 10 Minuten nach dem Hinzufügen der Tags und Metriken die darunter liegenden Daten verfügbar sein können.
Bemerkenswert ist, dass die kürzlich veröffentlichte Doris 1.2.0 Light Schema Change unterstützt, was bedeutet, dass Sie zum Hinzufügen oder Entfernen von Spalten nur die Metadaten in FE ändern müssen. Außerdem können Sie die Spalten in Datentabellen umbenennen, sofern Sie Light Schema Change für die Tabellen aktiviert haben. Das erspart uns viel Ärger.
Hier sind einige Vorgehensweisen, die unsere tägliche Offline-Datenaufnahmezeit um 75 % und unseren CUMU-Komprimierungswert von 600+ auf 100 reduziert haben.
Flink-Voraggregation: wie oben erwähnt.
Automatische Größenanpassung des Schreibstapels: Um die Ressourcennutzung von Flink zu reduzieren, ermöglichen wir das Schreiben der Daten in einem Kafka-Topic in verschiedene Doris-Tabellen und realisieren die automatische Änderung der Stapelgröße basierend auf der Datenmenge.
Optimierung des Schreibens von Doris-Daten: Feinabstimmung der Größen von Tablets und Buckets sowie der Verdichtungsparameter für jedes Szenario:
max_XXXX_compaction_thread max_cumulative_compaction_num_singleton_deltas
Optimierung der BE-Commit-Logik: Regelmäßiges Caching von BE-Listen durchführen, Batch für Batch an die BE-Knoten übergeben und eine feinere Lastausgleichsgranularität verwenden.
Verwenden Sie Dori-on-ES in Abfragen
Etwa 60 % unserer Datenabfragen beinhalten Gruppen-Targeting. Beim Gruppen-Targeting geht es darum, unsere Zieldaten mithilfe einer Reihe von Tags als Filter zu finden. Es stellt einige Anforderungen an unsere Datenverarbeitungsarchitektur:
Gruppen-Targeting im Zusammenhang mit APP-Benutzern kann eine sehr komplizierte Logik erfordern. Das bedeutet, dass das System Hunderte von Tags gleichzeitig als Filter unterstützen muss.
Die meisten Gruppen-Targeting-Szenarien erfordern nur die neuesten Tag-Daten. Metrikabfragen müssen jedoch historische Daten unterstützen.
Datenbenutzer müssen möglicherweise nach dem Gruppen-Targeting eine weitere aggregierte Analyse der Metrikdaten durchführen.
Datenbenutzer müssen möglicherweise nach dem Gruppen-Targeting auch detaillierte Abfragen zu Tags und Metriken durchführen.
Nach Überlegung haben wir uns für die Adoption von Doris-on-ES entschieden. In Doris speichern wir die Metrikdaten für jedes Szenario als Partitionstabelle, während Elasticsearch alle Tag-Daten speichert. Die Doris-on-ES-Lösung kombiniert die verteilte Abfrageplanungsfunktion von Doris und die Volltextsuchfunktion von Elasticsearch. Das Abfragemuster lautet wie folgt:
SELECT tag, agg(metric) FROM Doris WHERE id in (select id from Es where tagFilter) GROUP BY tag
Wie gezeigt, werden die in Elasticsearch gespeicherten ID-Daten in der Unterabfrage in Doris für die Metrikanalyse verwendet. In der Praxis stellen wir fest, dass die Antwortzeit der Anfrage mit der Größe der Zielgruppe zusammenhängt. Enthält die Zielgruppe mehr als eine Million Objekte, dauert die Abfrage bis zu 60 Sekunden. Wenn es noch größer ist, kann es zu einem Timeout-Fehler kommen. Nach einer Untersuchung haben wir unsere beiden größten Zeitfresser identifiziert:
I. Wenn Doris BE Daten aus Elasticsearch abruft (standardmäßig 1024 Zeilen gleichzeitig), kann der Netzwerk-E/A-Overhead für eine Zielgruppe von über einer Million Objekten enorm sein.
II. Nach dem Datenabruf muss Doris BE Join-Vorgänge mit lokalen Metriktabellen über SHUFFLE/BROADCAST durchführen, was viel kosten kann.
Daher nehmen wir folgende Optimierungen vor:
Fügen Sie eine Abfragesitzungsvariable es_optimize
hinzu, die angibt, ob die Optimierung aktiviert werden soll.
Fügen Sie beim Schreiben von Daten in ES eine BK-Spalte hinzu, um die Bucket-Nummer zu speichern, nachdem die Primärschlüssel-ID gehasht wurde. Der Algorithmus ist derselbe wie der Bucketing-Algorithmus in Doris (CRC32).
Verwenden Sie Doris BE, um einen Bucket-Join-Ausführungsplan zu generieren, senden Sie die Bucket-Nummer an BE ScanNode und übertragen Sie sie an ES.
Verwenden Sie ES, um die abgefragten Daten zu komprimieren. Verwandeln Sie mehrere Datenabrufe in einen und reduzieren Sie den Netzwerk-E/A-Overhead.
Stellen Sie sicher, dass Doris BE nur die Daten von Buckets abruft, die sich auf die lokalen Metriktabellen beziehen, und lokale Join-Vorgänge direkt durchführt, um eine Datenverschiebung zwischen Doris BEs zu vermeiden.
Dadurch reduzieren wir die Abfrageantwortzeit für das Targeting großer Gruppen von 60 Sekunden auf überraschende 3,7 Sekunden. Community-Informationen zeigen, dass Doris ab Version 2.0.0, die bald veröffentlicht wird, die invertierte Indexierung unterstützen wird. Mit dieser neuen Version können wir eine Volltextsuche nach Texttypen, Äquivalenz- oder Bereichsfilterung von Texten, Zahlen und Datum/Uhrzeit durchführen und beim Filtern bequem die Logik UND, ODER, NICHT kombinieren, da die invertierte Indizierung Array-Typen unterstützt. Es wird erwartet, dass diese neue Funktion von Doris bei derselben Aufgabe eine drei- bis fünfmal bessere Leistung als Elasticsearch liefert.
Doris‘ Fähigkeit zur Trennung kalter und heißer Daten bildet die Grundlage unserer Kostensenkungsstrategien in der Datenverarbeitung.
Basierend auf dem TTL-Mechanismus von Doris speichern wir nur Daten des aktuellen Jahres in Doris und legen die historischen Daten davor in TDW ab, um die Speicherkosten zu senken.
Wir variieren die Anzahl der Kopien für verschiedene Datenpartitionen. Beispielsweise legen wir drei Kopien für Daten der letzten drei Monate fest, die häufig verwendet werden, eine Kopie für Daten, die älter als sechs Monate sind, und zwei Kopien für Daten dazwischen.
Doris unterstützt die Umwandlung heißer Daten in kalte Daten, sodass wir nur die Daten der letzten sieben Tage auf der SSD speichern und ältere Daten zur kostengünstigeren Speicherung auf die Festplatte übertragen.
Vielen Dank, dass Sie bis hierher gescrollt und diese lange Lektüre beendet haben. Wir haben unseren Jubel und unsere Tränen, die gewonnenen Erkenntnisse und einige Praktiken geteilt, die für Sie während unseres Übergangs von ClickHouse zu Doris von Nutzen sein könnten. Wir wissen die Hilfe der Apache Doris-Community und des SelectDB-Teams sehr zu schätzen, werden ihnen aber möglicherweise noch eine Weile hinterherjagen, da wir versuchen, eine automatische Identifizierung kalter und heißer Daten, eine Vorberechnung häufig verwendeter Tags/Metriken zu realisieren. Vereinfachung der Codelogik mithilfe materialisierter Ansichten usw.