In a recent project, I was working on, I had to create a data pipeline that reads flight data from a REST API (Opensky Network API) and sends the data continuously to an S3 bucket. To accomplish this, I decided to use Kafka, a powerful and scalable streaming platform. This choice not only helped me achieve my project goals but also provided an excellent opportunity to learn Kafka and enhance my skill set.
By default, the API did not offer any webhooks or streaming services through web sockets, so I had to find a way to get the data into Kafka and simulate a streaming behavior. Polling the API was the only option available, the idea is to send requests repeatedly to the API given a certain frequency and then check if the data has been changed or not.
Two options existed to implement API polling and send the results to a Kafka topic. The first option was to incorporate the polling logic directly into the Kafka producer application. However, this approach lacked scalability, and fault tolerance, and would require significant coding effort. The second approach involved utilizing Kafka Connect, which offered scalability, and fault tolerance, and minimized the amount of code we needed to write.
Kafka Connect is a library that provides integration with different data sources and sinks, it runs different workers in a distributed fashion as a cluster or in a standalone mode for testing and small applications. Each worker can run different tasks defined by connectors, these connectors define how to get the data from a given source or how to send it to a destination (sink connector), it also defines a set of transformations and converters. We will not talk about how connectors work in detail as this is out of the scope of this article, and Confluent has excellent and in-depth tutorials on Kafka Connect.
Multiple connectors for common data sources and destinations have already been developed and shared with other developers for their use, You can find a ready-to-use connector in Confluent Hub or on GitHub.
In this tutorial, we will use a REST API connector developed by Lenny Löfberg and available on GitHub, This connector can poll data from a REST API and send the results to a Kafka topic. We will look at how to install and configure this connector to stream data about flights over a given region using the public and free OpenSky Network API.
This tutorial assumes that you have already installed Java 11 and the latest version of Kafka (3.5.0 at the time of writing this).
Before using Kafka Connect or installing a connector, let’s create a “connectors” folder inside the Kafka directory that will hold all the connector binaries and dependencies as well as other connectors that you might use in the future, we also need to create another folder inside the config folder of Kafka that will contain the connector configuration file and the worker configuration. Your Kafka folder structure should resemble the following :
kafka_3.5.0/
- bin/
- connectors/
- config/
- ...
- connectors/
- libs/
- licences
- logs/
...
Once the structure is in place, we can proceed to install the REST Kafka Connector, navigate to the connectors folder (kafka/connectors/
) and clone the GitHub repository:
git clone https://github.com/llofberg/kafka-connect-rest.git
You should have the kafka-connect-rest
folder with all the code and dependencies required to install the connector. To use a Kafka Connector we have to build the main JARs and the other dependencies (as JAR files also) and link them to our Kafka Connect cluster using a configuration file.
Let’s start with building the connector using Maven, to do so run the following commands:
mvn clean install
mkdir jars
cp kafka-connect-rest-plugin/target/kafka-connect-rest-plugin-*-shaded.jar jars/
cp kafka-connect-transform-from-json/kafka-connect-transform-from-json-plugin/target/kafka-connect-transform-from-json-plugin-*-shaded.jar jars/
cp kafka-connect-transform-add-headers/target/kafka-connect-transform-add-headers-*-shaded.jar jars/
cp kafka-connect-transform-velocity-eval/target/kafka-connect-transform-velocity-eval-*-shaded.jar jars/
These commands will build the whole project using maven based on the pom.xml
file which generates the JARs for each component, then it creates a jars
folder in the connector directory and copies all the built JAR files into that folder. From now on, we only need to point the Kafka Connect worker to these JAR files.
Now that the connector is installed, we will have to configure the worker instance and the connector itself. Let’s start with configuring the worker through the worker.properties
configuration file in the config/connectors
folder.
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=$KAFKA_HOME/connectors/
max.request.size=11085880
You will need to change these values based on your own Kafka environment, more precisely, you will need to edit the bootstrap server address and the plugin.path
option with the absolute path to the connector folder (replace $KAFKA_HOME
). The plugin.path
option is used by the worker to look for the connector JARs, it expects a directory of directories.
Another extra step you might need is to export the connector JAR files to the CLASSPATH of your Java runtime so they can be found and used by Kafka Connect worker :
sudo nano /etc/environment
And then edit the CLASSPATH variable in the opened file :
CLASSPATH=".:/usr/local/bin/kafka/connectors/kafka-connect-rest/jars"
export CLASSPATH
Make sure to modify it with the right path depending on your installation.
We are almost done with configuring the Kafka Connector, now we only need to edit the connector properties configuration file, to do so, create a rest-source-connector.properties
configuration file in the config/connector
folder. In this file, put the following configuration :
name=rest-source
key.converter = org.apache.kafka.connect.storage.StringConverter
value.converter = org.apache.kafka.connect.storage.StringConverter
connector.class = com.tm.kafka.connect.rest.RestSourceConnector
tasks.max = 1
rest.source.poll.interval.ms = 10000
rest.source.method = GET
rest.source.url = https://opensky-network.org/api/states/all
rest.source.headers = Content-Type:application/json,Accept:application/json
rest.source.topic.selector = com.tm.kafka.connect.rest.selector.SimpleTopicSelector
rest.source.destination.topics = flights
You will need to change the following options based on your needs :
rest.source.poll.interval.ms
: The polling frequency in milliseconds, in this example, a request will be sent each 10 seconds (10k ms).rest.source.method
: The HTTP method used to query the API.rest.source.url
: The endpoint you wish to call, make sure to include all the parameters in case of a GET request.rest.source.destination.topics
The topic where records will be sent to.
You are also free to change the headers and the number of required tasks in case you are running Kafka Connect with the distributed mode.
Before running the connector, make sure that Zookeeper server and Kafka server are running, then run Kafka Connect in standalone mode using the connect-standalone.sh
command available in Kafka :
./bin/connect-standalone.sh config/connectors/worker.properties config/connectors/rest-source.properties
The command takes as parameters the worker configuration file and the different connector configurations, once this is running, it will poll the API endpoint with a given frequency and will send the records to the specified Kafka topic.
In this tutorial we have seen how to stream data from an API into a Kafka topic which is quite common, you can learn more here about how you can use Kafka Connect with Docker and how to install Kafka on EC2 to set up a cloud-based data streaming pipeline. Kafka Connect is a very powerful library and offers both scalability and fault tolerance for your different data integrations.