Many of us use Kafka to publish messages, but how do we receive them? In this article, we will write a small application for consuming messages from Kafka. And, of course, e2e tests.
Kafka is a message broker in which some services generate messages and others receive them. Brokers are primarily used in systems with a microservice architecture to pass messages between services.
Messages are stored in topics. When sending a message, the producer indicates the name of the topic, as well as the message itself, which consists of a key and a value. And that’s it; the producer’s work is finished.
Then consumers come into play, they subscribe to the desired topic, and start reading messages. Each application has its own queue, reading from which the consumer moves the offset pointer.
The distinctive features of Kafka are:
Now, let's work with Kafka using the NestJs framework. First, we need to create a controller that will process messages.
@Controller()
export class AppController{
constructor(
private readonly appService: AppService,
) {
}
@EventPattern(config.get('kafka.topics.exampleTopic'), Transport.KAFKA)
handleEvent(
@Payload() payload: ExamplePayloadDto,
): Promise<void> {
return this.appService.handleExampleEvent(payload.message);
}
}
Pay attention to the @EventPattern
attribute, which indicates that our handleEvent()
function will receive messages from the topic specified in the configuration file config.get('kafka.topics.exampleTopic')
. The @Payload()
attribute helps to get the value from the topic message.
To connect your application to Kafka brokers, you need to do two things. To begin, connect the microservice in the startup file:
app.connectMicroservice({
transport: Transport.KAFKA,
options: {
client: {
clientId: config.get('kafka.clientId'),
brokers: config.get('kafka.brokers'),
retry: {
retries: config.get('kafka.retryCount'),
},
},
consumer: {
groupId: config.get('kafka.consumer.groupId'),
},
},
});
And then run the microservices in the main.ts:
async function bootstrap() {
const app = await NestFactory.create(AppModule, {
bufferLogs: true,
});
appStartup(app);
await app.startAllMicroservices();
await app.listen(config.get('app.port'));
};
void bootstrap();
To test the application, I use the @testcontainers/kafka package. With the help of this, I created a zooKeeper container, and then a Kafka container:
export async function kafkaSetup(): Promise<StartedTestContainer[]> {
const network = await new Network().start();
const zooKeeperHost = "zookeeper";
const zooKeeperPort = 2181;
const zookeeperContainer = await new GenericContainer("confluentinc/cp-zookeeper:7.3.2")
.withNetwork(network)
.withNetworkAliases(zooKeeperHost)
.withEnvironment({ ZOOKEEPER_CLIENT_PORT: zooKeeperPort.toString() })
.withExposedPorts(zooKeeperPort)
.start();
const kafkaPort = 9093;
const kafkaContainer = await new KafkaContainer()
.withNetwork(network)
.withZooKeeper(zooKeeperHost, zooKeeperPort)
.withExposedPorts(kafkaPort)
.start();
const externalPort = kafkaContainer.getMappedPort(kafkaPort);
config.set('kafka.brokers', [`localhost:${externalPort}`]);
return [
zookeeperContainer,
kafkaContainer,
];
}
Please note that, in this file, I have overridden the broker address to the newly created container.
In the test file itself, in the beforeAll
function, I create a Kafka client. With the producer, I also create a topic and launch our application.
beforeAll(async () => {
kafkaContainers = await kafkaSetup();
kafka = new Kafka({
clientId: 'mock',
brokers: config.get('kafka.brokers'),
logLevel: logLevel.NOTHING,
});
producer = kafka.producer();
await producer.connect();
const admin = kafka.admin();
await admin.connect();
await admin.createTopics({
topics: [{ topic: config.get('kafka.topics.exampleTopic') }],
});
appService = mockDeep<AppService>();
const module: TestingModule = await Test.createTestingModule({
imports: [AppModule],
})
.overrideProvider(AppService)
.useValue(appService)
.compile();
app = module.createNestApplication();
appStartup(app);
await app.startAllMicroservices();
await app.init();
}, 30 * 1000);
Of course, in the afterAll
function, you need to stop the containers:
afterAll(async () => {
await app.close();
await Promise.all(kafkaContainers.map(c => c.stop()));
}, 15 * 1000);
I wrote a test that verifies that when a message arrives in a topic, our handler function from the controller calls the necessary service function. To do this, I override the implementation of the handleExampleEvent
function and wait for it to be called.
describe('handleEvent', () => {
it('should call appService', async () => {
let resolve: (value: unknown) => void;
const promise = new Promise((res) => {
resolve = res;
});
appService.handleExampleEvent.mockImplementation(async () => {
resolve(0);
});
const event: ExamplePayloadDto = {
message: 'Hello World!',
};
await producer.send({
topic: config.get('kafka.topics.exampleTopic'),
messages: [{
key: 'key',
value: JSON.stringify(event),
}]
});
await promise;
await kafka.producer().disconnect();
});
});
That's all. Working with Kafka is incredibly easy if you use the NestJs framework. I hope my experience will be useful to you. An example application can be seen at https://github.com/waksund/kafka