paint-brush
NestJS ile Kafka Mesajları Nasıl Kullanılır?ile@vdolzhenko
6,131 okumalar
6,131 okumalar

NestJS ile Kafka Mesajları Nasıl Kullanılır?

ile Viktoria Dolzhenko7m2023/12/28
Read on Terminal Reader

Çok uzun; Okumak

Kafka, bazı servislerin mesaj ürettiği, diğerlerinin ise aldığı bir mesaj aracısıdır. Bu yazımızda Kafka'dan mesaj tüketmek için küçük bir uygulama yazacağız. Ve tabii ki e2e testleri. Öncelikle mesajları işleyecek bir kontrolör oluşturmamız gerekiyor. Ardından mikro hizmetleri NestJs çerçevesindeki ana hizmetlerde çalıştırın.
featured image - NestJS ile Kafka Mesajları Nasıl Kullanılır?
Viktoria Dolzhenko HackerNoon profile picture
0-item

Birçoğumuz mesajları yayınlamak için Kafka'yı kullanıyoruz, ancak bunları nasıl alıyoruz? Bu yazımızda Kafka'dan mesaj tüketmek için küçük bir uygulama yazacağız. Ve elbette e2e testleri.

Önce Kafka'nın Nasıl Çalıştığını ve Ne Olduğunu Anlayalım.

Kafka, bazı servislerin mesaj ürettiği, diğerlerinin ise aldığı bir mesaj aracısıdır. Aracılar öncelikle mikro hizmet mimarisine sahip sistemlerde hizmetler arasında mesaj iletmek için kullanılır.


Mesajlar konularda saklanır. Bir mesaj gönderirken yapımcı, konunun adının yanı sıra bir anahtar ve değerden oluşan mesajın kendisini de belirtir. Ve bu kadar; Yapımcının işi bitti.


Daha sonra devreye tüketiciler giriyor, istedikleri konuya abone oluyor ve mesajları okumaya başlıyorlar. Her uygulamanın, tüketicinin ofset işaretçisini hareket ettirdiği kendi okuma kuyruğu vardır.



Kafka'nın ayırt edici özellikleri şunlardır:

  • Tüm mesajların tam olarak konuya geldikleri sıraya göre sıralanacağını garanti edin


  • Kafka bir süreliğine okunan mesajları depolar


  • Yüksek verim


Şimdi NestJs çerçevesini kullanarak Kafka ile çalışalım. Öncelikle mesajları işleyecek bir kontrolör oluşturmamız gerekiyor.


 @Controller() export class AppController{ constructor( private readonly appService: AppService, ) { } @EventPattern(config.get('kafka.topics.exampleTopic'), Transport.KAFKA) handleEvent( @Payload() payload: ExamplePayloadDto, ): Promise<void> { return this.appService.handleExampleEvent(payload.message); } }


handleEvent() fonksiyonumuzun config.get('kafka.topics.exampleTopic') yapılandırma dosyasında belirtilen konudan mesajlar alacağını belirten @EventPattern niteliğine dikkat edin. @Payload() özelliği, konu mesajından değerin alınmasına yardımcı olur.


Uygulamanızı Kafka aracılarına bağlamak için iki şey yapmanız gerekir. Başlamak için mikro hizmeti başlangıç dosyasına bağlayın:


 app.connectMicroservice({ transport: Transport.KAFKA, options: { client: { clientId: config.get('kafka.clientId'), brokers: config.get('kafka.brokers'), retry: { retries: config.get('kafka.retryCount'), }, }, consumer: { groupId: config.get('kafka.consumer.groupId'), }, }, });


Daha sonra mikro hizmetleri main.ts dosyasında çalıştırın:


 async function bootstrap() { const app = await NestFactory.create(AppModule, { bufferLogs: true, }); appStartup(app); await app.startAllMicroservices(); await app.listen(config.get('app.port')); }; void bootstrap();


Uygulamayı test etmek için @testcontainers/kafka paketini kullanıyorum. Bunun yardımıyla bir zooKeeper konteyneri ve ardından bir Kafka konteyneri oluşturdum:


 export async function kafkaSetup(): Promise<StartedTestContainer[]> { const network = await new Network().start(); const zooKeeperHost = "zookeeper"; const zooKeeperPort = 2181; const zookeeperContainer = await new GenericContainer("confluentinc/cp-zookeeper:7.3.2") .withNetwork(network) .withNetworkAliases(zooKeeperHost) .withEnvironment({ ZOOKEEPER_CLIENT_PORT: zooKeeperPort.toString() }) .withExposedPorts(zooKeeperPort) .start(); const kafkaPort = 9093; const kafkaContainer = await new KafkaContainer() .withNetwork(network) .withZooKeeper(zooKeeperHost, zooKeeperPort) .withExposedPorts(kafkaPort) .start(); const externalPort = kafkaContainer.getMappedPort(kafkaPort); config.set('kafka.brokers', [`localhost:${externalPort}`]); return [ zookeeperContainer, kafkaContainer, ]; }


Lütfen bu dosyada aracı adresini yeni oluşturulan kapsayıcıya geçersiz kıldığımı unutmayın.

Test dosyasının kendisinde beforeAll işlevinde bir Kafka istemcisi oluşturuyorum. Yapımcıyla birlikte bir konu da oluşturup uygulamamızı başlatıyorum.


 beforeAll(async () => { kafkaContainers = await kafkaSetup(); kafka = new Kafka({ clientId: 'mock', brokers: config.get('kafka.brokers'), logLevel: logLevel.NOTHING, }); producer = kafka.producer(); await producer.connect(); const admin = kafka.admin(); await admin.connect(); await admin.createTopics({ topics: [{ topic: config.get('kafka.topics.exampleTopic') }], }); appService = mockDeep<AppService>(); const module: TestingModule = await Test.createTestingModule({ imports: [AppModule], }) .overrideProvider(AppService) .useValue(appService) .compile(); app = module.createNestApplication(); appStartup(app); await app.startAllMicroservices(); await app.init(); }, 30 * 1000);


Elbette afterAll işlevinde kapları durdurmanız gerekir:


 afterAll(async () => { await app.close(); await Promise.all(kafkaContainers.map(c => c.stop())); }, 15 * 1000);


Bir konuya mesaj geldiğinde denetleyicideki işleyici fonksiyonumuzun gerekli servis fonksiyonunu çağırdığını doğrulayan bir test yazdım. Bunu yapmak için, handleExampleEvent fonksiyonunun uygulamasını geçersiz kılıyorum ve çağrılmasını bekliyorum.


 describe('handleEvent', () => { it('should call appService', async () => { let resolve: (value: unknown) => void; const promise = new Promise((res) => { resolve = res; }); appService.handleExampleEvent.mockImplementation(async () => { resolve(0); }); const event: ExamplePayloadDto = { message: 'Hello World!', }; await producer.send({ topic: config.get('kafka.topics.exampleTopic'), messages: [{ key: 'key', value: JSON.stringify(event), }] }); await promise; await kafka.producer().disconnect(); }); });


Bu kadar. NestJs çerçevesini kullanırsanız Kafka ile çalışmak inanılmaz derecede kolaydır. Umarım deneyimim sizin için yararlı olacaktır. Örnek bir uygulama https://github.com/waksund/kafka adresinde görülebilir.