お気に入りのアプリがリアルタイム更新をどのように処理しているか疑問に思ったことはありませんか? ライブ スポーツのスコア、株式市場のティッカー、ソーシャル メディアの通知など、これらはすべてイベント駆動型アーキテクチャ (EDA) を利用してデータを瞬時に処理します。EDA は、新しい情報ごとに即座に応答が返される会話に似ています。これにより、アプリケーションはよりインタラクティブで応答性が高くなります。
このチュートリアルでは、Heroku で Apache Kafka を使用してシンプルなイベント駆動型アプリケーションを構築する手順を説明します。内容は次のとおりです。
Apache Kafka は、 EDA システムを構築するための強力なツールです。リアルタイム データ フィードを処理するために設計されたオープン ソース プラットフォームです。Apache Kafka on Herokuは、Kafka をサービスとして提供する Heroku アドオンです。Heroku を使用すると、アプリケーションのデプロイと管理が非常に簡単になり、最近はプロジェクトで Heroku を使用することが多くなりました。Kafka と Heroku を組み合わせると、イベント駆動型アプリケーションを実行する際のセットアップ プロセスが簡素化されます。
このガイドを読み終えると、Heroku 上の Apache Kafka を使用した EDA の威力を実証する実行可能なアプリケーションが完成します。さあ、始めましょう!
コードに進む前に、いくつかの基本的な概念を簡単に確認しましょう。これらを理解すれば、理解が簡単になります。
KafkaJSライブラリを使用して Node.js アプリケーションを構築します。アプリケーションの動作の概要は次のとおりです。
当社の気象センサー (プロデューサー) は、温度、湿度、気圧などのデータを定期的に生成し、これらのイベントを Apache Kafka に送信します。デモの目的で、データはランダムに生成されます。
トピックをリッスンするコンシューマーを用意します。新しいイベントを受信すると、そのデータがログに書き込まれます。
セットアップ全体を Heroku にデプロイし、発生したイベントを Heroku ログを使用して監視します。
始める前に、以下のものを用意してください。
このプロジェクト全体のコードベースは、このGitHub リポジトリで入手できます。自由にコードをクローンして、この投稿全体をフォローしてください。
基本を説明したので、Heroku に Kafka クラスターをセットアップして構築を開始しましょう。
Heroku ですべてをセットアップしましょう。これは非常に迅速かつ簡単なプロセスです。
~/project$ heroku login
~/project$ heroku create weather-eda
(私は Heroku アプリにweather-eda
という名前を付けましたが、アプリに固有の名前を選択することもできます。)
~/project$ heroku addons:create heroku-kafka:basic-0 Creating heroku-kafka:basic-0 on ⬢ weather-eda... ~$0.139/hour (max $100/month) The cluster should be available in a few minutes. Run `heroku kafka:wait` to wait until the cluster is ready. You can read more about managing Kafka at https://devcenter.heroku.com/articles/kafka-on-heroku#managing-kafka kafka-adjacent-07560 is being created in the background. The app will restart when complete... Use heroku addons:info kafka-adjacent-07560 to check creation progress Use heroku addons:docs heroku-kafka to view documentation
Apache Kafka on Heroku アドオンの詳細については、こちらを参照してください。デモでは、アドオンの Basic 0 層を追加しています。アドオンの料金は 0.139 ドル/時間です。このデモ アプリケーションの構築中に、アドオンを 1 時間未満使用し、その後スピンダウンしました。
Heroku が Kafka を起動して準備するまでには数分かかります。すぐに、次のような画面が表示されます。
~/project$ heroku addons:info kafka-adjacent-07560 === kafka-adjacent-07560 Attachments: weather-eda::KAFKA Installed at: Mon May 27 2024 11:44:37 GMT-0700 (Mountain Standard Time) Max Price: $100/month Owning app: weather-eda Plan: heroku-kafka:basic-0 Price: ~$0.139/hour State: created
Kafka クラスターを起動したら、認証情報やその他の設定を取得する必要があります。Heroku は、アプリケーション用にいくつかの設定変数を作成し、先ほど作成した Kafka クラスターからの情報を入力します。次のコマンドを実行すると、これらの設定変数をすべて確認できます。
~/project$ heroku config === weather-eda Config Vars KAFKA_CLIENT_CERT: -----BEGIN CERTIFICATE----- MIIDQzCCAiugAwIBAgIBADANBgkqhkiG9w0BAQsFADAyMTAwLgYDVQQDDCdjYS1h ... -----END CERTIFICATE----- KAFKA_CLIENT_CERT_KEY: -----BEGIN RSA PRIVATE KEY----- MIIEowIBAAKCAQEAsgv1oBiF4Az/IQsepHSh5pceL0XLy0uEAokD7ety9J0PTjj3 ... -----END RSA PRIVATE KEY----- KAFKA_PREFIX: columbia-68051. KAFKA_TRUSTED_CERT: -----BEGIN CERTIFICATE----- MIIDfzCCAmegAwIBAgIBADANBgkqhkiG9w0BAQsFADAyMTAwLgYDVQQDDCdjYS1h ... F+f3juViDqm4eLCZBAdoK/DnI4fFrNH3YzhAPdhoHOa8wi4= -----END CERTIFICATE----- KAFKA_URL: kafka+ssl://ec2-18-233-140-74.compute-1.amazonaws.com:9096,kafka+ssl://ec2-18-208-61-56.compute-1.amazonaws.com:9096...kafka+ssl://ec2-34-203-24-91.compute-1.amazonaws.com:9096
ご覧のとおり、いくつかの設定変数があります。プロジェクトのルート フォルダーに、これらすべての設定変数値を含む.env
というファイルが必要です。これを行うには、次のコマンドを実行するだけです。
~/project$ heroku config --shell > .env
.env
ファイルは次のようになります。
KAFKA_CLIENT_CERT="-----BEGIN CERTIFICATE----- ... -----END CERTIFICATE-----" KAFKA_CLIENT_CERT_KEY="-----BEGIN RSA PRIVATE KEY----- ... -----END RSA PRIVATE KEY-----" KAFKA_PREFIX="columbia-68051." KAFKA_TRUSTED_CERT="-----BEGIN CERTIFICATE----- ... -----END CERTIFICATE-----" KAFKA_URL="kafka+ssl://ec2-18-233-140-74.compute-1.amazonaws.com:9096,kafka+ssl://ec2-18-208-61-56.compute-1.amazonaws.com:9096...kafka+ssl://ec2-34-203-24-91.compute-1.amazonaws.com:9096"
また、.gitignore ファイルに .env を追加するようにします。この機密データをリポジトリにコミットすることは望ましくありません。
Heroku CLI には、すぐに使用できる Kafka 関連のコマンドは付属していません。Kafka を使用しているため、 CLI プラグインをインストールする必要があります。
~/project$ heroku plugins:install heroku-kafka Installing plugin heroku-kafka... installed v2.12.0
これで、CLI から Kafka クラスターを管理できるようになりました。
~/project$ heroku kafka:info === KAFKA_URL Plan: heroku-kafka:basic-0 Status: available Version: 2.8.2 Created: 2024-05-27T18:44:38.023+00:00 Topics: [··········] 0 / 40 topics, see heroku kafka:topics Prefix: columbia-68051. Partitions: [··········] 0 / 240 partition replicas (partitions × replication factor) Messages: 0 messages/s Traffic: 0 bytes/s in / 0 bytes/s out Data Size: [··········] 0 bytes / 4.00 GB (0.00%) Add-on: kafka-adjacent-07560 ~/project$ heroku kafka:topics === Kafka Topics on KAFKA_URL No topics found on this Kafka cluster. Use heroku kafka:topics:create to create a topic (limit 40)
健全性チェックとして、Kafka クラスターを試してみましょう。まずはトピックを作成します。
~/project$ heroku kafka:topics:create test-topic-01 Creating topic test-topic-01 with compaction disabled and retention time 1 day on kafka-adjacent-07560... done Use `heroku kafka:topics:info test-topic-01` to monitor your topic. Your topic is using the prefix columbia-68051.. ~/project$ heroku kafka:topics:info test-topic-01 ▸ topic test-topic-01 is not available yet
約 1 分以内に、トピックが利用可能になります。
~/project$ heroku kafka:topics:info test-topic-01 === kafka-adjacent-07560 :: test-topic-01 Topic Prefix: columbia-68051. Producers: 0 messages/second (0 bytes/second) total Consumers: 0 bytes/second total Partitions: 8 partitions Replication Factor: 3 Compaction: Compaction is disabled for test-topic-01 Retention: 24 hours
次に、このターミナル ウィンドウで、コンシューマーとして動作し、このトピックを追跡してリッスンします。
~/project$ heroku kafka:topics:tail test-topic-01
ここから、ターミナルはトピックで公開されるイベントを待機するだけです。
別のターミナル ウィンドウで、プロデューサーとして機能し、トピックに関するいくつかのメッセージを公開します。
~/project$ heroku kafka:topics:write test-topic-01 "hello world!"
消費者のターミナル ウィンドウに戻ると、次のようになります。
~/project$ heroku kafka:topics:tail test-topic-01 test-topic-01 0 0 12 hello world!
素晴らしい! Kafka クラスターのトピックにイベントを生成して消費することに成功しました。Node.js アプリケーションに進む準備ができました。プレイグラウンドを整理するために、このテスト トピックを破棄しましょう。
~/project$ heroku kafka:topics:destroy test-topic-01 ▸ This command will affect the cluster: kafka-adjacent-07560, which is on weather-eda ▸ To proceed, type weather-eda or re-run this command with --confirm weather-eda > weather-eda Deleting topic test-topic-01... done Your topic has been marked for deletion, and will be removed from the cluster shortly ~/project$ heroku kafka:topics === Kafka Topics on KAFKA_URL No topics found on this Kafka cluster. Use heroku kafka:topics:create to create a topic (limit 40).
アプリケーションで Kafka を使用する準備をするには、トピックとコンシューマー グループの 2 つを作成する必要があります。
アプリケーションが使用するトピックを作成しましょう。
~/project$ heroku kafka:topics:create weather-data
次に、アプリケーションのコンシューマーが所属するコンシューマー グループを作成します。
~/project$ heroku kafka:consumer-groups:create weather-consumers
Node.js アプリケーションを構築する準備ができました。
新しいプロジェクトを初期化し、依存関係をインストールしましょう。
~/project$ npm init -y ~/project$ npm install kafkajs dotenv @faker-js/faker pino pino-pretty
私たちのプロジェクトでは、次の 2 つのプロセスが実行されます。
consumer.js
はトピックにサブスクライブされ、公開されたすべてのイベントをログに記録します。
producer.js
は、トピックに関するランダムな天気データを数秒ごとに公開します。
これら両方のプロセスでは、Kafka クラスターに接続するために KafkaJS を使用する必要があるため、コードをモジュール化して再利用できるようにします。
プロジェクトのsrc
フォルダーに、 kafka.js
というファイルを作成します。次のようになります。
const { Kafka } = require('kafkajs'); const BROKER_URLS = process.env.KAFKA_URL.split(',').map(uri => uri.replace('kafka+ssl://','' )) const TOPIC = `${process.env.KAFKA_PREFIX}weather-data` const CONSUMER_GROUP = `${process.env.KAFKA_PREFIX}weather-consumers` const kafka = new Kafka({ clientId: 'weather-eda-app-nodejs-client', brokers: BROKER_URLS, ssl: { rejectUnauthorized: false, ca: process.env.KAFKA_TRUSTED_CERT, key: process.env.KAFKA_CLIENT_CERT_KEY, cert: process.env.KAFKA_CLIENT_CERT, }, }) const producer = async () => { const p = kafka.producer() await p.connect() return p; } const consumer = async () => { const c = kafka.consumer({ groupId: CONSUMER_GROUP, sessionTimeout: 30000 }) await c.connect() await c.subscribe({ topics: [TOPIC] }); return c; } module.exports = { producer, consumer, topic: TOPIC, groupId: CONSUMER_GROUP };
このファイルでは、まず新しい Kafka クライアントを作成します。これには Kafka ブローカーの URL が必要ですが、これは.env
ファイルのKAFKA_URL
変数 (元々は heroku config の呼び出しから取得) から解析できます。接続試行を認証するには、 KAFKA_TRUSTED_CERT
、 KAFKA_CLIENT_CERT_KEY
、およびKAFKA_CLIENT_CERT
を提供する必要があります。
次に、Kafka クライアントから、 producer
とconsumer
を作成し、コンシューマーがweather-data
トピックにサブスクライブされるようにします。
kafka.js
で、トピックとコンシューマー グループ名の前にKAFKA_PREFIX
追加していることに注意してください。ここでは、マルチテナント Kafka プランである、Apache Kafka on Heroku の Basic 0 プランを使用しています。つまり、 KAFKA_PREFIX
を使用します。トピックにweather-data
、コンシューマー グループにweather-consumers
という名前を付けましたが、マルチテナント Kafka クラスター内の実際の名前には、 KAFKA_PREFIX
追加する必要があります (一意であることを保証するため)。
したがって、技術的には、このデモでは、実際のトピック名はweather-data
columbia-68051.weather-data
です。(コンシューマー グループ名も同様です。)
次に、気象センサーのプロデューサーとして機能するバックグラウンド プロセスを作成しましょう。プロジェクトのルート フォルダーに、 producer.js
というファイルがあります。次のようになります。
require('dotenv').config(); const kafka = require('./src/kafka.js'); const { faker } = require('@faker-js/faker'); const SENSORS = ['sensor01','sensor02','sensor03','sensor04','sensor05']; const MAX_DELAY_MS = 20000; const READINGS = ['temperature','humidity','barometric_pressure']; const MAX_TEMP = 130; const MIN_PRESSURE = 2910; const PRESSURE_RANGE = 160; const getRandom = (arr) => arr[faker.number.int(arr.length - 1)]; const getRandomReading = { temperature: () => faker.number.int(MAX_TEMP) + (faker.number.int(100) / 100), humidity: () => faker.number.int(100) / 100, barometric_pressure: () => (MIN_PRESSURE + faker.number.int(PRESSURE_RANGE)) / 100 }; const sleep = (ms) => { return new Promise((resolve) => { setTimeout(resolve, ms); }); }; (async () => { const producer = await kafka.producer() while(true) { const sensor = getRandom(SENSORS) const reading = getRandom(READINGS) const value = getRandomReading[reading]() const data = { reading, value } await producer.send({ topic: kafka.topic, messages: [{ key: sensor, value: JSON.stringify(data) }] }) await sleep(faker.number.int(MAX_DELAY_MS)) } })()
ファイル内のコードの多くはランダムな値の生成に関係しています。重要な部分を強調します。
SENSORS
にあります。
temperature
、 humidity
、 barometric_pressure
の 3 つの測定値のうち 1 つに対して値を送信 (公開) します。getRandomReading getRandomReading
には、これらの測定値ごとに、対応する適切な値を生成する関数があります。
while
ループを持つasync
関数として実行されます。
while
ループ内では、次の処理が行われます。
sensor
ランダムに選択します。reading
選択してください。value
を生成します。producer.send
を呼び出します。 sensor
イベントのkey
として機能し、 reading
とvalue
イベント メッセージを形成します。consumer.js
のバックグラウンド プロセスはかなりシンプルです。
require('dotenv').config(); const logger = require('./src/logger.js'); const kafka = require('./src/kafka.js'); (async () => { const consumer = await kafka.consumer() await consumer.run({ eachMessage: async ({ topic, partition, message }) => { const sensorId = message.key.toString() const messageObj = JSON.parse(message.value.toString()) const logMessage = { sensorId } logMessage[messageObj.reading] = messageObj.value logger.info(logMessage) } }) })()
私たちのconsumer
既にweather-data
トピックにサブスクライブされています。 consumer.run
を呼び出し、 eachMessage
のハンドラーを設定します。Kafka がメッセージをconsumer
に通知するたびに、メッセージがログに記録されます。これですべてです。
Procfile
package.json
ファイルでは、プロデューサーとコンシューマーのバックグラウンド プロセスを起動するscripts
いくつか追加する必要があります。ファイルには、次の内容が含まれている必要があります。
... "scripts": { "start": "echo 'do nothing'", "start:consumer": "node consumer.js", "start:producer": "node producer.js" }, ...
重要なのはstart:consumer
とstart:producer
です。ただし、Heroku ビルダーは start が存在することを期待しているため、 start
ファイルに残します (意味のあることは何もしませんが)。
次に、Heroku アプリに必要なさまざまなワーカーを起動する方法を Heroku に指示するProcfile
を作成します。プロジェクトのルート フォルダーでは、 Procfile
次のようになります。
consumer_worker: npm run start:consumer producer_worker: npm run start:producer
とてもシンプルですよね? consumer_worker
というバックグラウンド プロセス ワーカーと、 producer_worker
という別のワーカーがあります。Web アプリケーションのProcfile
でよく見かけるweb
ワーカーがないことに気がつくでしょう。Heroku アプリに必要なのは、2 つのバックグラウンド ワーカーだけです。Web web
必要ありません。
これで、すべてのコードが設定されました。すべてのコードをリポジトリにコミットし、デプロイする準備が整いました。
~/project$ git push heroku main … remote: -----> Build succeeded! … remote: -----> Compressing... remote: Done: 48.6M remote: -----> Launching... … remote: Verifying deploy... done
デプロイした後、dyno が適切にスケーリングされていることを確認します。Web プロセスには dyno は必要ありませんが、 consumer_worker
とproducer_worker
の両方に dyno が必要になります。次のコマンドを実行して、ニーズに基づいてこれらのプロセスを設定します。
~/project$ heroku ps:scale web=0 consumer_worker=1 producer_worker=1 Scaling dynos... done, now running producer_worker at 1:Eco, consumer_worker at 1:Eco, web at 0:Eco
これで、すべてが稼働するはずです。舞台裏では、 producer_worker
が Kafka クラスターに接続し、数秒ごとに気象センサー データを公開し始めます。次に、 consumer_worker
Kafka クラスターに接続し、サブスクライブしているトピックから受信したメッセージをログに記録します。
consumer_worker
が何をしているかを確認するには、Heroku ログを確認します。
~/project$ heroku logs --tail … heroku[producer_worker.1]: Starting process with command `npm run start:producer` heroku[producer_worker.1]: State changed from starting to up app[producer_worker.1]: app[producer_worker.1]: > [email protected] start:producer app[producer_worker.1]: > node producer.js app[producer_worker.1]: … heroku[consumer_worker.1]: Starting process with command `npm run start:consumer` heroku[consumer_worker.1]: State changed from starting to up app[consumer_worker.1]: app[consumer_worker.1]: > [email protected] start:consumer app[consumer_worker.1]: > node consumer.js app[consumer_worker.1]: app[consumer_worker.1]: {"level":"INFO","timestamp":"2024-05-28T02:31:20.660Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"columbia-68051.weather-consumers"} app[consumer_worker.1]: {"level":"INFO","timestamp":"2024-05-28T02:31:23.702Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"columbia-68051.weather-consumers","memberId":"weather-eda-app-nodejs-client-3ee5d1fa-eba9-4b59-826c-d3b924a6e4e4","leaderId":"weather-eda-app-nodejs-client-3ee5d1fa-eba9-4b59-826c-d3b924a6e4e4","isLeader":true,"memberAssignment":{"columbia-68051.test-topic-1":[0,1,2,3,4,5,6,7]},"groupProtocol":"RoundRobinAssigner","duration":3041} app[consumer_worker.1]: [2024-05-28 02:31:23.755 +0000] INFO (21): {"sensorId":"sensor01","temperature":87.84} app[consumer_worker.1]: [2024-05-28 02:31:23.764 +0000] INFO (21): {"sensorId":"sensor01","humidity":0.3} app[consumer_worker.1]: [2024-05-28 02:31:23.777 +0000] INFO (21): {"sensorId":"sensor03","temperature":22.11} app[consumer_worker.1]: [2024-05-28 02:31:37.773 +0000] INFO (21): {"sensorId":"sensor01","barometric_pressure":29.71} app[consumer_worker.1]: [2024-05-28 02:31:54.495 +0000] INFO (21): {"sensorId":"sensor05","barometric_pressure":29.55} app[consumer_worker.1]: [2024-05-28 02:32:02.629 +0000] INFO (21): {"sensorId":"sensor04","temperature":90.58} app[consumer_worker.1]: [2024-05-28 02:32:03.995 +0000] INFO (21): {"sensorId":"sensor02","barometric_pressure":29.25} app[consumer_worker.1]: [2024-05-28 02:32:12.688 +0000] INFO (21): {"sensorId":"sensor04","humidity":0.1} app[consumer_worker.1]: [2024-05-28 02:32:32.127 +0000] INFO (21): {"sensorId":"sensor01","humidity":0.34} app[consumer_worker.1]: [2024-05-28 02:32:32.851 +0000] INFO (21): {"sensorId":"sensor02","humidity":0.61} app[consumer_worker.1]: [2024-05-28 02:32:37.200 +0000] INFO (21): {"sensorId":"sensor01","barometric_pressure":30.36} app[consumer_worker.1]: [2024-05-28 02:32:50.388 +0000] INFO (21): {"sensorId":"sensor03","temperature":104.55}
うまくいきました!コンシューマーがメッセージを受信してログに記録しているので、プロデューサーが定期的に Kafka にメッセージを公開していることがわかります。
もちろん、より大規模な EDA アプリでは、すべてのセンサーがプロデューサーになります。さまざまな目的で複数のトピックにパブリッシュすることも、すべてが同じトピックにパブリッシュすることもできます。また、コンシューマーは複数のトピックにサブスクライブできます。また、デモ アプリでは、コンシューマーはeachMessage
で大量のメッセージを送信しましたが、EDA アプリケーションでは、コンシューマーはサードパーティ API を呼び出したり、SMS 通知を送信したり、データベースを照会したりして応答する場合があります。
イベント、トピック、プロデューサー、コンシューマーの基本を理解し、Kafka の操作方法を理解できたので、より複雑なビジネス ユース ケースを満たす独自の EDA アプリケーションの設計と構築を開始できます。
EDA は非常に強力です。システムを分離しながら、簡単なスケーラビリティやリアルタイム データ処理などの主要機能を利用できます。EDA にとって、Kafka は重要なツールであり、高スループットのデータ ストリームを簡単に処理するのに役立ちます。Heroku で Apache Kafka を使用すると、すぐに使い始めることができます。マネージド サービスであるため、Kafka クラスター管理の複雑な部分を心配する必要はありません。アプリの構築に集中できます。
ここからは、実験とプロトタイプの作成です。EDA に適したユースケースを特定します。実際に使用して Heroku でテストし、素晴らしいものを構築します。コーディングを楽しんでください!