우리 중 많은 사람들이 Kafka를 사용하여 메시지를 게시하지만 메시지를 어떻게 받나요? 이 기사에서는 Kafka의 메시지를 소비하기 위한 작은 애플리케이션을 작성합니다. 물론 e2e 테스트도 마찬가지입니다.
Kafka는 일부 서비스가 메시지를 생성하고 다른 서비스는 이를 수신하는 메시지 브로커입니다. 브로커는 서비스 간에 메시지를 전달하기 위해 마이크로서비스 아키텍처를 갖춘 시스템에서 주로 사용됩니다.
메시지는 주제에 저장됩니다. 메시지를 보낼 때 생산자는 주제의 이름과 키와 값으로 구성된 메시지 자체를 나타냅니다. 그리고 그게 다야; 프로듀서의 작업이 완료되었습니다.
그런 다음 소비자가 참여하여 원하는 주제를 구독하고 메시지를 읽기 시작합니다. 각 애플리케이션에는 소비자가 오프셋 포인터를 이동하는 대기열을 읽는 자체 대기열이 있습니다.
카프카의 특징은 다음과 같습니다.
이제 NestJs 프레임워크를 사용하여 Kafka로 작업해 보겠습니다. 먼저 메시지를 처리할 컨트롤러를 만들어야 합니다.
@Controller() export class AppController{ constructor( private readonly appService: AppService, ) { } @EventPattern(config.get('kafka.topics.exampleTopic'), Transport.KAFKA) handleEvent( @Payload() payload: ExamplePayloadDto, ): Promise<void> { return this.appService.handleExampleEvent(payload.message); } }
handleEvent()
함수가 구성 파일 config.get('kafka.topics.exampleTopic')
에 지정된 주제로부터 메시지를 수신함을 나타내는 @EventPattern
속성에 주의하세요. @Payload()
속성은 주제 메시지에서 값을 가져오는 데 도움이 됩니다.
애플리케이션을 Kafka 브로커에 연결하려면 두 가지 작업을 수행해야 합니다. 시작하려면 시작 파일에서 마이크로서비스를 연결합니다.
app.connectMicroservice({ transport: Transport.KAFKA, options: { client: { clientId: config.get('kafka.clientId'), brokers: config.get('kafka.brokers'), retry: { retries: config.get('kafka.retryCount'), }, }, consumer: { groupId: config.get('kafka.consumer.groupId'), }, }, });
그런 다음 main.ts에서 마이크로서비스를 실행합니다.
async function bootstrap() { const app = await NestFactory.create(AppModule, { bufferLogs: true, }); appStartup(app); await app.startAllMicroservices(); await app.listen(config.get('app.port')); }; void bootstrap();
애플리케이션을 테스트하기 위해 @testcontainers/kafka 패키지를 사용합니다. 이것의 도움으로 나는 ZooKeeper 컨테이너를 만든 다음 Kafka 컨테이너를 만들었습니다.
export async function kafkaSetup(): Promise<StartedTestContainer[]> { const network = await new Network().start(); const zooKeeperHost = "zookeeper"; const zooKeeperPort = 2181; const zookeeperContainer = await new GenericContainer("confluentinc/cp-zookeeper:7.3.2") .withNetwork(network) .withNetworkAliases(zooKeeperHost) .withEnvironment({ ZOOKEEPER_CLIENT_PORT: zooKeeperPort.toString() }) .withExposedPorts(zooKeeperPort) .start(); const kafkaPort = 9093; const kafkaContainer = await new KafkaContainer() .withNetwork(network) .withZooKeeper(zooKeeperHost, zooKeeperPort) .withExposedPorts(kafkaPort) .start(); const externalPort = kafkaContainer.getMappedPort(kafkaPort); config.set('kafka.brokers', [`localhost:${externalPort}`]); return [ zookeeperContainer, kafkaContainer, ]; }
이 파일에서는 새로 생성된 컨테이너의 브로커 주소를 재정의했습니다.
테스트 파일 자체의 beforeAll
함수에서 Kafka 클라이언트를 생성합니다. 프로듀서와 함께 주제를 생성하고 애플리케이션을 시작하기도 합니다.
beforeAll(async () => { kafkaContainers = await kafkaSetup(); kafka = new Kafka({ clientId: 'mock', brokers: config.get('kafka.brokers'), logLevel: logLevel.NOTHING, }); producer = kafka.producer(); await producer.connect(); const admin = kafka.admin(); await admin.connect(); await admin.createTopics({ topics: [{ topic: config.get('kafka.topics.exampleTopic') }], }); appService = mockDeep<AppService>(); const module: TestingModule = await Test.createTestingModule({ imports: [AppModule], }) .overrideProvider(AppService) .useValue(appService) .compile(); app = module.createNestApplication(); appStartup(app); await app.startAllMicroservices(); await app.init(); }, 30 * 1000);
물론 afterAll
함수에서는 컨테이너를 중지해야 합니다.
afterAll(async () => { await app.close(); await Promise.all(kafkaContainers.map(c => c.stop())); }, 15 * 1000);
메시지가 주제에 도착하면 컨트롤러의 핸들러 함수가 필요한 서비스 함수를 호출하는지 확인하는 테스트를 작성했습니다. 이를 위해, handleExampleEvent
함수의 구현을 재정의하고 호출될 때까지 기다립니다.
describe('handleEvent', () => { it('should call appService', async () => { let resolve: (value: unknown) => void; const promise = new Promise((res) => { resolve = res; }); appService.handleExampleEvent.mockImplementation(async () => { resolve(0); }); const event: ExamplePayloadDto = { message: 'Hello World!', }; await producer.send({ topic: config.get('kafka.topics.exampleTopic'), messages: [{ key: 'key', value: JSON.stringify(event), }] }); await promise; await kafka.producer().disconnect(); }); });
그게 다야. NestJs 프레임워크를 사용하면 Kafka로 작업하는 것이 매우 쉽습니다. 내 경험이 당신에게 도움이 되기를 바랍니다. 예제 애플리케이션은 https://github.com/waksund/kafka 에서 볼 수 있습니다.