paint-brush
Cómo consumir mensajes de Kafka con NestJSpor@vdolzhenko
6,131 lecturas
6,131 lecturas

Cómo consumir mensajes de Kafka con NestJS

por Viktoria Dolzhenko7m2023/12/28
Read on Terminal Reader

Demasiado Largo; Para Leer

Kafka es un intermediario de mensajes en el que algunos servicios generan mensajes y otros los reciben. En este artículo escribiremos una pequeña aplicación para consumir mensajes de Kafka. Y por supuesto pruebas e2e. Primero necesitamos crear un controlador que procese los mensajes. Luego ejecute los microservicios en los servicios principales en el marco NestJs.
featured image - Cómo consumir mensajes de Kafka con NestJS
Viktoria Dolzhenko HackerNoon profile picture
0-item

Muchos usamos Kafka para publicar mensajes, pero ¿cómo los recibimos? En este artículo, escribiremos una pequeña aplicación para consumir mensajes de Kafka. Y, por supuesto, pruebas e2e.

Primero comprendamos cómo funciona Kafka y qué es.

Kafka es un intermediario de mensajes en el que algunos servicios generan mensajes y otros los reciben. Los corredores se utilizan principalmente en sistemas con arquitectura de microservicios para pasar mensajes entre servicios.


Los mensajes se almacenan en temas. Al enviar un mensaje, el productor indica el nombre del tema, así como el mensaje en sí, que consta de una clave y un valor. Y eso es; El trabajo del productor está terminado.


Entonces entran en juego los consumidores, se suscriben al tema deseado y empiezan a leer los mensajes. Cada aplicación tiene su propia cola, desde la cual el consumidor mueve el puntero de desplazamiento.



Las características distintivas de Kafka son:

  • Garantizar que todos los mensajes se ordenarán exactamente en la secuencia en la que llegaron al tema.


  • Las tiendas Kafka leen mensajes por un tiempo


  • Alto rendimiento


Ahora, trabajemos con Kafka usando el marco NestJs. Primero, necesitamos crear un controlador que procese los mensajes.


 @Controller() export class AppController{ constructor( private readonly appService: AppService, ) { } @EventPattern(config.get('kafka.topics.exampleTopic'), Transport.KAFKA) handleEvent( @Payload() payload: ExamplePayloadDto, ): Promise<void> { return this.appService.handleExampleEvent(payload.message); } }


Preste atención al atributo @EventPattern , que indica que nuestra función handleEvent() recibirá mensajes del tema especificado en el archivo de configuración config.get('kafka.topics.exampleTopic') . El atributo @Payload() ayuda a obtener el valor del mensaje del tema.


Para conectar su aplicación a los corredores de Kafka, debe hacer dos cosas. Para comenzar, conecte el microservicio en el archivo de inicio:


 app.connectMicroservice({ transport: Transport.KAFKA, options: { client: { clientId: config.get('kafka.clientId'), brokers: config.get('kafka.brokers'), retry: { retries: config.get('kafka.retryCount'), }, }, consumer: { groupId: config.get('kafka.consumer.groupId'), }, }, });


Y luego ejecute los microservicios en main.ts:


 async function bootstrap() { const app = await NestFactory.create(AppModule, { bufferLogs: true, }); appStartup(app); await app.startAllMicroservices(); await app.listen(config.get('app.port')); }; void bootstrap();


Para probar la aplicación, utilizo el paquete @testcontainers/kafka. Con la ayuda de esto, creé un contenedor zooKeeper y luego un contenedor Kafka:


 export async function kafkaSetup(): Promise<StartedTestContainer[]> { const network = await new Network().start(); const zooKeeperHost = "zookeeper"; const zooKeeperPort = 2181; const zookeeperContainer = await new GenericContainer("confluentinc/cp-zookeeper:7.3.2") .withNetwork(network) .withNetworkAliases(zooKeeperHost) .withEnvironment({ ZOOKEEPER_CLIENT_PORT: zooKeeperPort.toString() }) .withExposedPorts(zooKeeperPort) .start(); const kafkaPort = 9093; const kafkaContainer = await new KafkaContainer() .withNetwork(network) .withZooKeeper(zooKeeperHost, zooKeeperPort) .withExposedPorts(kafkaPort) .start(); const externalPort = kafkaContainer.getMappedPort(kafkaPort); config.set('kafka.brokers', [`localhost:${externalPort}`]); return [ zookeeperContainer, kafkaContainer, ]; }


Tenga en cuenta que, en este archivo, anulé la dirección del corredor al contenedor recién creado.

En el archivo de prueba, en la función beforeAll , creo un cliente Kafka. Con el productor, también creo un tema y lanzo nuestra aplicación.


 beforeAll(async () => { kafkaContainers = await kafkaSetup(); kafka = new Kafka({ clientId: 'mock', brokers: config.get('kafka.brokers'), logLevel: logLevel.NOTHING, }); producer = kafka.producer(); await producer.connect(); const admin = kafka.admin(); await admin.connect(); await admin.createTopics({ topics: [{ topic: config.get('kafka.topics.exampleTopic') }], }); appService = mockDeep<AppService>(); const module: TestingModule = await Test.createTestingModule({ imports: [AppModule], }) .overrideProvider(AppService) .useValue(appService) .compile(); app = module.createNestApplication(); appStartup(app); await app.startAllMicroservices(); await app.init(); }, 30 * 1000);


Por supuesto, en la función afterAll , debes detener los contenedores:


 afterAll(async () => { await app.close(); await Promise.all(kafkaContainers.map(c => c.stop())); }, 15 * 1000);


Escribí una prueba que verifica que cuando llega un mensaje a un tema, nuestra función de controlador del controlador llama a la función de servicio necesaria. Para hacer esto, anulo la implementación de la función handleExampleEvent y espero a que se llame.


 describe('handleEvent', () => { it('should call appService', async () => { let resolve: (value: unknown) => void; const promise = new Promise((res) => { resolve = res; }); appService.handleExampleEvent.mockImplementation(async () => { resolve(0); }); const event: ExamplePayloadDto = { message: 'Hello World!', }; await producer.send({ topic: config.get('kafka.topics.exampleTopic'), messages: [{ key: 'key', value: JSON.stringify(event), }] }); await promise; await kafka.producer().disconnect(); }); });


Eso es todo. Trabajar con Kafka es increíblemente fácil si utiliza el marco NestJs. Espero que mi experiencia te sea de utilidad. Se puede ver una aplicación de ejemplo en https://github.com/waksund/kafka