One very important thing in the software development process that is often overlooked in the early stages of a project is API documentation. Very often there are situations when developers do not describe the current API services due to tight deadlines or insufficient motivation, they plan to take up this task in the future. As a result, as the project grows, a large number of methods, parameters, and data formats appear, the description of which requires a huge amount of time and effort.
This problem is especially critical when the development team is divided into small teams, such as front-end and back-end developers. In this case, without documentation, many problems can arise during the implementation process and a lot of time is spent on meetings and discussions of already implemented methods. Knowledge is passed orally from developer to developer.
One of the solutions to this problem is frameworks for the automatic generation of documentation. For example, web applications often use swagger or its derivatives to describe REST APIs. This allows you to document the interfaces by which the front-end and back-end services interact.
Swagger is a really good tool, but the interaction between different parts of the system is not limited to the REST API. In the case of dividing the project into microservices and using the Event-driven architecture, the interaction between services is built using events shared through the message broker. And each microservice is handled by a separate development team. It's reasonable to consider automatically documenting the events that are exchanged between services, just as swagger describes the REST API.
In this article, we will look at how to enable automatic event documentation for services implemented using Spring Boot with interaction through Apache Kafka.
Apache Kafka is a distributed streaming platform that allows you to process trillions of events per day. Kafka guarantees low latency, and high throughput provides fault-tolerant publish/subscribe pipelines and allows you to process event streams.
To solve our problem, there is AsyncApi. AsyncAPI is an open source initiative that seeks to improve the current state of Event-Driven Architectures (EDA). AsyncApi has several Java tools that allow you to generate documentation from code. Springwolf is my choice because it provides a UI similar to springfox.
To get started, we need to add dependencies to gradle.build
implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka:3.2.2'
implementation 'javax.json:javax.json-api:1.1.4'
implementation 'org.glassfish:javax.json:1.1.4'
compileOnly 'org.projectlombok:lombok'
// Provides the documentation API
implementation 'io.github.springwolf:springwolf-kafka:0.6.1'
runtimeOnly 'io.github.springwolf:springwolf-ui:0.4.0'
First, we need to define the events that we will send or expect from the topic. I will create an abstract DomainEvent class that will be extended with specific events and define serialization and deserialization rules in it using Jackson annotations.
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@AllArgsConstructor
@Getter
@Setter(AccessLevel.PROTECTED)
@EqualsAndHashCode
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.EXISTING_PROPERTY,
property = "type",
visible = true,
defaultImpl = EntityChangedEvent.class
)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = EntityChangedEvent.type, value = EntityChangedEvent.class),
@JsonSubTypes.Type(name = EntityDeletedEvent.type, value = EntityDeletedEvent.class),
@JsonSubTypes.Type(value = EntityChangedEvent.class)
})
@JsonIgnoreProperties(ignoreUnknown = true)
public abstract class DomainEvent {
private String id;
@JsonSerialize(using = LocalDateTimeSerializer.class)
@JsonDeserialize(using = LocalDateTimeDeserializer.class)
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss")
private LocalDateTime occuredOn = LocalDateTime.now();
public abstract String getType();
}
Change event class.
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@Getter
@Setter(AccessLevel.PRIVATE)
@EqualsAndHashCode(callSuper = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
public class EntityChangedEvent extends DomainEvent {
public static final String type = "ENTITY_CHANGED_EVENT";
private String title;
private String description;
private String code;
@NonNull
private String entityId;
@Builder
public EntityChangedEvent(
String id,
@NonNull String entityId,
@NonNull String code,
String title,
String description
) {
super(id, LocalDateTime.now());
if(StringUtils.isAllBlank(title, description)) {
throw new IllegalStateException("changes is none");
}
this.entityId = entityId;
this.code = code;
this.title = title;
this.description = description;
}
@Override
public String getType() {
return type;
}
}
Delete event class
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@Getter
@Setter(AccessLevel.PRIVATE)
@EqualsAndHashCode(callSuper = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
public class EntityDeletedEvent extends DomainEvent {
public static final String type = "ENTITY_DELETED_EVENT";
@NonNull
private String entityId;
@Builder
public EntityDeletedEvent(
String id,
@NonNull String valueId
) {
super(id, LocalDateTime.now());
this.valueId = valueId;
}
@Override
public String getType() {
return type;
}
}
After describing the event classes, we can move on to the listener configuration. To do this, we will get the configurations class KafkaConsumerConfig, in which we will initiate Beans for the ConcurrentKafkaListenerContainerFactory. We will also define the component KafkaListeners for our factory. If there are several factories, for example, if we have several families of events that need to be divided, then for each it is necessary to create a similar class.
/**
Consumer configuration class for initiate factories for event families
*/
@Configuration
public class KafkaConsumerConfig {
private final String SERVER;
private final String SERVER_PORT;
private final String GROUP;
public KafkaConsumerConfig(
@Value("${spring.cloud.stream.bindings.service-out-0.group}") String group,
@Value("${spring.cloud.stream.kafka.binder.brokers}") String server,
@Value("${spring.cloud.stream.kafka.binder.defaultBrokerPort}") String port
) {
this.GROUP = group;
this.SERVER = server;
this.SERVER_PORT = port;
}
/*
each key will be serialized to String and each event will be serialized to JSON
*/
private <T> ConsumerFactory<String, T> typeConsumerFactory(Class<T> clazz) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, String.format("%s:%s", SERVER, SERVER_PORT));
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP);
return new DefaultKafkaConsumerFactory<>(
props,
new StringDeserializer(),
new JsonDeserializer<>(clazz));
}
private <T> ConcurrentKafkaListenerContainerFactory<String, T> initFactory(Class<T> clazz) {
ConcurrentKafkaListenerContainerFactory<String, T> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(typeConsumerFactory(clazz));
return factory;
}
/*
init bean factory for abstract DomainEvent. This factory accept all subclasses of DomainEvent
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, DomainEvent> domainEventKafkaListenerContainerFactory() {
return initFactory(DomainEvent.class);
}
}
KafkaListener class must be define for each factory.
@Component
@Log
@KafkaListener(
topics = "topic-test",
groupId="group-test",
containerFactory="domainEventKafkaListenerContainerFactory")
public class KafkaListeners {
@KafkaHandler
void entityChangedKafkaListener(@Payload EntityChangedEvent message) {
log.info(String.format("KafkaHandler[EntityChanged] %s %s %s", message.getType(), message.getEntityId(), message.getId()));
}
@KafkaHandler
void entityDeletedKafkaListener(@Payload EntityDeletedEvent message) {
log.info(String.format("KafkaHandler[EntityDeleted] %s %s %s", message.getType(), message.getEntityId(), message.getId()));
}
}
Finally, we can move on to the springwolf configuration. We need to set up information about the application, producers, and consumers. Consumers are configured automatically based on the KafkaListener and KafkaHandler annotations, however, producers must be enumerated manually. The whole config will look something like this.
@Configuration
@EnableAsyncApi
public class AsyncApiConfiguration {
private final String SERVER;
private final String SERVER_PORT;
private final String CONSUMERS_BASE_PACKAGE = "consumers.package";
private final String CHANNEL_NAME;
public AsyncApiConfiguration(
@Value("${spring.cloud.stream.bindings.service-out-0.destination}") String channelName,
@Value("${spring.cloud.stream.kafka.binder.brokers}") String server,
@Value("${spring.cloud.stream.kafka.binder.defaultBrokerPort}") String port
) {
this.CHANNEL_NAME = channelName;
this.SERVER = server;
this.SERVER_PORT = port;
}
@Bean
public AsyncApiDocket asyncApiDocket() {
//application info
Info info = Info.builder()
.version("api version")
.title("service name")
.description("description")
.build();
//kafka server info
Server kafkaServer = Server.builder()
.protocol("kafka")
.url(String.format("%s:%s", SERVER, SERVER_PORT))
.build();
//init producer builder
KafkaProducerData.ProducerDataBuilder producerBuilder = ProducerData.builder()
.channelName(CHANNEL_NAME)
.channelBinding(Map.of("kafka", new KafkaChannelBinding()))
.operationBinding(Map.of("kafka", new KafkaOperationBinding()));
//list of event types for documentation
List<Class<?>> events = List.of(
EntityChangedEvent.class,
EntityDeletedEvent.class
);
//generate producers
List<ProducerData> producers = events.stream()
.map(c -> producerBuilder
.payloadType(c)
.build()
).collect(Collectors.toList());
//build AsyncApiDocket
return AsyncApiDocket.builder()
.basePackage(CONSUMERS_BASE_PACKAGE) //package for search KafkaListeners
.info(info) //application info
.producers(producers) //producers
.server("kafka", kafkaServer) //kafka srever info
.build();
}
}
The interface will be available at /springwolf/asyncapi-ui.html. Documentation in JSON format can be found at /springwolf/docs
In this article, we looked at how you can set up auto-generation of documentation for an Event-Driven architecture. Do not forget to take care of this at the start of your project to reduce costs in the future.
https://www.asyncapi.com/docs/tools https://github.com/springwolf/springwolf-core https://springwolf.github.io/docs/quickstart/