Nowadays, we can rely on different streaming systems to transmit and collect our data. Some of the systems that we use, require extremely high availability. On the other hand, for other systems, high availability can be a trade-off between having a reliable system. Sometimes getting systems slower, can allow room to increase the reliability of the data being transmitted. What is fascinating about this, is that the choices are quite endless. To stream our data we can use push or pull mechanisms for example. It is very important that we are aware of the distinction between them.
A pull mechanism is one where the process is constantly looping through a channel or buffer to see what’s in there. A push mechanism sends the data to a place where a process is waiting for it. These two distinct mechanisms are exactly the main difference between using Kafka and RabbitMQ respectively. Kafka has its origins in LinkedIn in 2010 and RabbitMQ was developed on February 1, 2007, by a company of a similar name: Rabbit Technologies Ltd. What’s important to notice is that RabbitMQ implements the AMQP (Advanced Messaging Queuing Protocol). RabbitMQ is also referred to as an AMQP broker.
Later on, Kafka opted to develop stream buses. These two protocols are the main focus of this article. At the same time, we are going to go through other different data transmission protocols mostly used in IoT (Internet of Things). One is MQTT (Message Queuing Telemetry Transport) and another is CoAP (Constrained Application Protocol). Let’s begin by looking at the history of the protocols we are going to investigate in this article. It is very important to note that this article will be subject to revisions. Given its complexity and the number of moving parts described in it, it is highly unlikely that every single bit of the code will be explained. Our focus is on MQTT, CoAP, RabbitMQ, and Kafka, regardless. The code is an added value that will be added to this article along the way.
In 1999, two engineers, Andy Stanford-Clark from IBM and Arlen Nipper from Eurotech, designed this protocol. The problem they were trying to solve was the unreliable connection via satellite of different Oil pipelines. The idea was to create a lightweight protocol that would occupy a very narrow bandwidth. The software had to support any sort of data and support multiple levels of QoS(Quality of Service). This protocol was designed to be used in an M2M (Machine to Machine) fashion, and it’s currently widely used in IoT protocols.
RabbitMQ is mostly used to send messages via AMQP (Advanced Messaging Queuing Protocol). This protocol was invented in 2003 by John O’Hara at JPMorgan Chase in London. The goal was to create a language-neutral network protocol for enterprise messaging. By achieving this, John O’Hara created a market for different AMQP implementations. The most widely known are ActiveMQ, Apache Qpid, and RabbitMQ
RabbitMQ was developed on February 1, 2007, by Rabbit Technologies Ltd. RabbitMQ is a broker implementation of the AMQP protocol.
This protocol is also supposed to be used in IoT as an M2M. This first draft of this protocol appeared for the first time in RFC7252 in December 2009. Years later in 2014 it got approved and authored by C. Bormann. Although it’s being used in this article, please note that CoAP is an Internet Application Protocol designed for constrained devices. Constrained devices are end nodes that serve a specific purpose in an IoT architecture. They are usually connected to sensors. At the same time, it is also a service layer protocol that allows connectivity between devices with constrained resources. CoAP stands for Constrained Application Protocol.
In 2010, Jay Kreps
, Jun Rao, and Neha Narkhede created Kafka at LinkedIn HQ. The problem they were trying to solve was the ingestion of an ever-growing rate of messages. Systems that implemented AMPQ like RabbitMQ and ActiveMQ, provide extended possibilities in regard to transaction support, message tracing, and protocol mediation. These are all heavy operations that provide high reliability. However, these real-time systems were not suited to keep all the data from the overgrowth of message exchanges that were being generated at LinkedIn
. At the same time, none of them was storing data. Instead, they were just passing data around. For every read, every message was deleted from the queues.
In order to keep the data, extra design time, effort, and processing were necessary to transfer the data to a persistent storage mechanism. What the developers at LinkedIn realized is that most of these messages didn’t actually require all the message handling provided by RabbitQ and therefore there was also just too much over-head being generated ideally there should be a system that would provide an out-of-the-box persistence mechanism and avoid deleting messages on the fly.
With the absence of a delete operation, there could potentially be an increase in performance. Furthermore, they realized that the data being used needed to be replayed on some occasions and that they could be removed in precise intervals. This is because LinkedIn needed, for various reasons, including marketing, customer-specific services, and all sorts of data analysis, to keep the data somewhere. That data was not to be analyzed by people. This went hand in hand with the explosion of AI
(Artificial Intelligence) and ML (Machine Learning) developments. LinkedIn wanted data and with it, make a platform that would reach all corners of the world with the best possible performance.
Since around 1993 there has been a dramatic increase in developments surrounding AI
and in 2011 these automated developments took another shift into Big Data, Deep Learning, and AI
. This means that Kafka was being developed around the time that companies understood the business value of applying AI
and ML to data. Data became a business, and science material, and currently, data, data collecting, and data analysis have become very important commodities in our day-to-day lives. Both RabbitMQ and Kafka are currently very mature and reliable systems. The reason why Kafka can be chosen as an alternative to RabbitMQ is basically if we want to store the data being received and if we want to provide multiple replays of the same data.
In this article, we are going to have a look at how AMQP and RabbitMQ technologies work with message exchanges and how we can take advantage of them to collect data. At the same time, we are also going to look at how Kafka can be used for the same purpose. In the same way, we will see how Apache spark can work together with Kafka to collect data. Furthermore, we are going to see how can we federate RabbitMQ queues. Finally, we will briefly have a look at CoAP and MQTT as lightweight, low-bandwidth protocols, and their usage for IoT. RabbitMQ and Kafka also provide ways to make direct connections using these protocols, however, these are specifically left out of this article in order to prevent it from becoming too complex. It is also important to understand the chronology of the inception of these protocols.
To complete our study, we are going to play a spy game in a way that we can check the databases and in that way find clues needed for our investigation.
To proceed with the analysis of this article, we need to first understand that in this exercise we will go through a pretty wide range of technologies. Here is a list of what we’ll need:
We are going to implement three major players. One is a train, the other is a bridge for the train, and finally the central services. The bridge for the train is fixed and there are no opening times. The main goal is to supervise what comes in and out of a bridge and at what times. Potentially provide data for other important investigations. In this project(GitHub), we’ll find an investigation game where we need to find the identity of a spy. Nothing too complicated. It is only a way to understand all fundamental players of these architectures and how they work together. This is the general picture of our scenario:
Let’s break this up. We’ll now briefly describe what these services are supposed to do.
The central service is responsible for keeping all our data. In this case, we have a few concerns. The central service needs to have some sort of persistence mechanism to keep the data. Further, it needs to be able in some way to receive lots and lots of data coming from external services.
Our train services are responsible for 3 important functions. It needs to send periodic data about merchandise and the people on the train. It also needs to send check-in and check-out messages when crossing the bridge.
In our bridge server, we’ll need to implement 3 important functionalities. We will have a sensor service. It will detect check-ins and check-outs from trains. It will not register anything related to the identity of the train, only the fact that there has been a train over the bridge. This information will be triangulated with the data coming from the train server’s sensor service module. Two IoT-related elements will also be implemented. One that reads temperatures and another that reads humidity values. The train will use the bridge_01 federation.
Let’s start by thinking about what we want for persistence. First, we look at the data we want to receive from IoT, and this would be the temperature and the humidity. These, in a real scenario, would represent a tremendous amount of data. In this case, we would be looking at some sort of big data mechanism. However, Big Data is a whole paradigm on its own.
For this example, our big data mechanism will be Cassandra. Another thing we need to think about is the passenger information. We want to send at periodic intervals the whole passenger data. Namely, we are interested in registering their weight. It’s important to monitor that accurately as described above. This represents also a lot of information. In the same way as the meter’s information, it is also time-series data. We really aren’t that worried if in one of the periods some data is lost. We repeat sending the data of all the passengers periodically with a relatively high frequency and that information should not change that much. For this reason, we will also register the passenger information in a Big Data fashion.
Now, it’s time to think about other data that we will receive with less frequency and where we actually don’t want to miss any message. Or at least, we just want more reliability. For this case, we do want to benefit, mainly from transaction handling and message handling. Protocol mediation will still happen anyway, given that our messages need to go through AMQP, but this also ensures their reliability.
In this article, we won’t discuss the creation of a website, but given that this would be the ultimate goal in a real-case scenario, we are going to keep that in mind. For a website, it can be better to have an ER database model implemented. For this reason, we choose PostgreSQL.To get the data to Cassandra we are going to use Apache Spark for the data reader implementation. This way we can connect Spark to our Kafka streams. We will have spark processes running in the central streaming service and we will have Kafka processes running at the central. We will have RabbitMQ brokers running in the bridge and the train to get the merchandise data through to reach PostgreSQL. The data will be fetched from a RabbitMQ exchange federated stream located at central. This is what this architecture looks like: In the following, we’ll go through all the relevant implementations:
It is important to notice that although the implementation and the project overview look very extended and complicated, the actual coding is actually quite far from being complicated. We will now take a dive into the code and go through every topic one by one.
One of the major technological players in this architecture is RabbitMQ. We are using this technology to get the merchandise information and the sensor data. The merchandise information is of crucial importance and the rate of transmission is very low. In sensor data rate of transmission is even lower, because it only happens when trains cross the bridge. We also know that we won’t be needing to replay any of these messages. The reason being is that although we need to be sure that we load in the train gets entirely delivered at its destination, the mechanisms to prevent stealing are already in place. Containers are sealed and passengers cannot board or cross the merchandise carriages. We only need the merchandise information to have an idea of how its weight affects the bridge. In regards to the sensors, we also do not want to store the check-in/check-out date. One lost check-in/check-out data won’t make a difference between hundreds of them in the course of the day. Therefore none of this information is actually vita nor is its throughput very high.
Before configuring this, it’s important to know that we are not going to go very deep into the configuration of RabbitMQ. In spite of multiple possible configurations, we are going to keep it on a 1 to 1 basis as much as possible. Let’s begin. In the bl-central-server/bl-central-streaming we find a Docker with the following definition:
FROM rabbitmq:3.9-management
WORKDIR /root
ENV LANG=C.UTF-8
RUN apt -y update
RUN apt install curl -y
RUN apt install python3 -y
RUN apt -y upgrade
COPY entrypoint.sh /root
ENTRYPOINT ["/root/entrypoint.sh"]
# RabbitMQ
EXPOSE 5672 15672
We start off by using one image, which contains a raw installation of RabbitMQ and the management plugin called rabbitmq:3.9-management. We are going to enable something called federation. This is simply said, just a way of connecting two RabbitMQ brokers together. They can be close together or they can physically be located very far away. The idea behind this is that they end up working as a single broker.
We need this in order to connect our RabbitMQ broker running in our train and bridge, to our central server. Finally, in order to be able to visualize our webpage and allow other containers to find our broker in the central streaming service, we need to make two essential ports available. These are ports 5672 and 15672. These are the RabbitMQ server port and the Web GUI (Graphic User Interface) respectively. The convention in RabbitMQ is that the relationship between these ports is a 10000 difference. In our example, if we say that our server port is 5672, then we are already implying that our GUI port is 10000 + 5672 = 15672 port. Let’s now have a look at our entrypoint.sh. Our entry point file is very large and therefore It’s probably better to have a look at sections. First, we start our server:
#!/usr/bin/env bash
rabbitmq-server -detached
rabbitmqctl start_app
sleep 1
rabbitmqctl await_startup
rabbitmqctl start_app
sleep 1
rabbitmqctl await_startup
rabbitmq-plugins enable rabbitmq_federation
rabbitmq-plugins enable rabbitmq_federation_management
rabbitmq-plugins enable rabbitmq_management
rabbitmq-server -detached
rabbitmqctl await_startup
After starting our service we still need to download a module called rabbitmqadmin. This module allows us to configure virtual-hosts, queues and exchanges, amongst other features, via the command line.
curl -S http://localhost:15672/cli/rabbitmqadmin > /usr/local/bin/rabbitmqadmin
chmod +x /usr/local/bin/rabbitmqadmin
We now need to think about general configurations for our RabbitMQ server. We only need to create one user. We add a test user with test as password. Then we give our user the administrator profile. Finally, we set permissions to everything from the root to our newly created user.
rabbitmqctl add_user test test
rabbitmqctl set_user_tags test administrator
rabbitmqctl set_permissions -p / test ".*" ".*" ".*"
We will need to create federated queues. The order in which we create them doesn’t really matter. They do, however, need to be running at the same time in order to be accessible. As we have seen in the general overview, we need to create 3 federations. We will create them all in the same way. In this way, we can benefit from creating a bash function to help us create different federations. What we are going to do is to create a virtual host. Then we’ll set all permissions to it on our test user. RabbitMQ needs an exchange to receive messages.
Exchanges RabbitMQ are message routers that distribute messages to queues via bindings and routing keys. There are many types of exchanges. For our example, we will use fanout. This type of exchange only means that no routing key is actually used or needed. This means if more queues are bound to this exchange, they will each get a copy of every message delivered to the exchange. Now, we can create our queue. Then we bind the queue with the exchange. Afterward, we federate this queue with the upstream in the remote service. Finally, we set up the federation policy.
federate(){
rabbitmqctl add_vhost bl_$1_vh
rabbitmqctl set_permissions -p bl_$1_vh test ".*" ".*" ".*"
rabbitmqadmin -u test -p test -V bl_$1_vh declare exchange name=bl_$1_exchange type=fanout
rabbitmqadmin -u test -p test -V bl_$1_vh declare queue name=bl_$1_queue
rabbitmqadmin -u test -p test -V bl_$1_vh declare binding source=bl_$1_exchange destination=bl_$1_queue
rabbitmqctl set_parameter -p bl_$1_vh federation-upstream bl_$1_upstream '{"uri":"amqp://test:test@bl_'$2'_server:5672/bl_'$1'_vh","expires":3600000}'
rabbitmqctl set_policy -p bl_$1_vh --apply-to all bl_$1_policy ".*$1.*" '{"federation-upstream-set":"all"}'
}
At last, we call our function for every federation we want to create. In this case, we create 3 virtual hosts, 3 exchanges, 3 queues, 3 federations and 3 federation policies.
federate train_01_merchandise train_01
federate train_01_sensor train_01
federate bridge_01_sensor bridge_01
tail -f /dev/null
The creation of virtual hosts, exchanges, and queues are the same for the train, and bridge servers. The only difference is that in these last 2, no federation is created. Therefore, no federation policy needs to be created too. This is how it looks for one of these federations. In this case, this is a federation for the bridge service to get the data from the presence sensor to register check-in and check-out:
In our Kafka setup, we have discussed that we want to get all the meters data and the passenger data. These are high volumes of information, with very high throughput, and are considered to be of critical value. The meter data is critical because we want to evaluate how the weather conditions affect the bridge, and the passenger data is also critical because we want to make sure we keep track of movements in the train. The latter is critical to keep records for investigation purposes or to understand how the complete weight per carriage affects the carriage itself and the train composition.
Just as in the case of our previous RabbitMQ implementation, our Kafka implementation doesn’t differ that much between the train-server and the bridge server. These are the only servers that contain running Kafka buses. The RabbitMQ implementation is exactly the same as we discussed already above. We can find it in bl-bridge-server/rabbitmq. To implement the sensors, we first need to consider the special mosquitto case. In our example, we run a plain mosquitto service located at bl-bridge-server/mosquitto. This is the mqtt broker. The image definition is practically empty:
FROM eclipse-mosquitto:2.0.11
ENV LANG=C.UTF-8
COPY mosquitto.config /mosquitto/config/mosquitto.conf
#MQTT
EXPOSE 1883
We can see mosquitto.conf being copied. This is because recent versions of mosquitto do not allow anonymous authentication and they also do not open port 1883. The file content is this:
listener 1883
allow_anonymous true
Finally, we can implement the image for our sensors located in bl-bridge-server.
FROM node:current-alpine3.14
WORKDIR /usr/local/bin
ENV LANG=C.UTF-8
COPY bl-bridge-temperature-coap/dist /usr/local/bin/bl-bridge-temperature-coap/dist
COPY bl-bridge-temperature-coap/node_modules /usr/local/bin/bl-bridge-temperature-coap/node_modules
COPY bl-bridge-humidity-mqtt/dist /usr/local/bin/bl-bridge-humidity-mqtt/dist
COPY bl-bridge-humidity-mqtt/node_modules /usr/local/bin/bl-bridge-humidity-mqtt/node_modules
COPY entrypoint.sh /usr/local/bin
#CoAP
EXPOSE 5683
ENTRYPOINT ["/usr/local/bin/entrypoint.sh"]
In these steps, we are making copies of files needed to run the humidity and temperature meter services. We’ll talk about this further in this article. This is the breakdown of all the ports being used:
5674 — The communication port used to access the RabbitMQ
server
15674 — The GUI
port
5683 — The CoAP
port. This is the entry point to a small service we are using to receive messages using CoAP protocol
1883 — MQTT
port. On our server, we have an MQTT
service. The implementation we are using is from MOSQUITTO
. We have now concluded our Docker
file setup. Let’s now go through our entrypoint.sh
file. For the train-server, the beginning of this file doesn’t differ that much from the central stream service file, as described before. Therefore, let’s now look at how we start all our installed services:
#!/bin/sh for counter in $(seq 1 30); do echo -ne "Starting app in $counter seconds ...\r" sleep 1 done node bl-bridge-humidity-mqtt/dist/app.js bl_central_kafka_server & node bl-bridge-temperature-coap/dist/app.js bl_central_kafka_server & tail -f /dev/null
The whole docker startup demands a lot of processing, and so it is likely that processes will compete to get resources. They will get all of them in the end, but excessive delays can occur because we are running everything locally. This is the reason why we first wait 30 seconds before starting the CoAP
and the MQTT
services. This should give time to let Kafka at central start.
In this case, we already have Kafka and Zookeeper installed for us. In our Kafka implementation in central in the folder, bl-central-server/kafka we find our Docker file:
FROM confluentinc/cp-kafka
WORKDIR /usr/local/bin
ENV LANG=C.UTF-8
COPY startKafka.sh /usr/local/bin/
COPY *.properties /opt/kafka/config/
COPY entrypoint.sh /usr/local/bin/
#Kafka
EXPOSE 9092 9093
ENTRYPOINT ["/usr/local/bin/entrypoint.sh"]
What we are doing now is just copying the configuration files for Kafka and Zookeeper to their respective setup folders. Finally, we copy the startup scripts to the entry point folder.
#!/usr/bin/env sh
rm -rf /home/appuser/kafka-logs/*
./startKafka.sh &
tail -f /dev/null
This just means that we will start the zookeeper
script with our configuration file. We’ll look into this file afterward. Now, let’s see what is happening in the startKafka.sh
script:
#!/usr/bin/env bash
/usr/bin/zookeeper-server-start /etc/kafka/zookeeper.properties &
sleep 4
/usr/bin/kafka-server-start /opt/kafka/config/server0.properties &
/usr/bin/kafka-server-start /opt/kafka/config/server1.properties &
sleep 4
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic TEMPERATURE
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic HUMIDITY
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic WINDSPEED
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic WINDDIRECTION
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic PASSENGER
In the first part of this script, we are starting our 2 brokers which we defined as necessary for sending Temperature and Humidity data. We then sleep for 4 seconds to allow the Kafka brokers to start. Once they are started, we can create our topics using the already started zookeeper service. We create 5: TEMPERATURE
, HUMIDITY
, WINDSPEED
, WINDDIRECTION
and PASSENGER
.
For this article, we will only use the first two and the last one. In the zookeeper folder, we find file log4j.properties. This is just an optional configuration file necessary if we want to see and examine zookeeper logs:
log4j.rootCategory=WARN, zklog, INFO
log4j.appender.zklog=org.apache.log4j.RollingFileAppender
log4j.appender.zklog.File=/usr/local/var/log/zookeeper/zookeeper.log
log4j.appender.zklog.Append=true
log4j.appender.zklog.layout=org.apache.log4j.PatternLayout
log4j.appender.zklog.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n
In our Kafka image, zookeeper runs by default on port 281
Let’s now have a look at one of the Kafka broker configuration files. Please note that I’m only showing the changed and important lines of the file and not the complete file:
broker.id=0
port=9092
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
Here is a breakdown of these properties:
In our implementation and for demo purposes, we are only using one client per Kafka
cluster. Our Kafka
clients are the Apache Spark processes, whose main goal is to collect data. The important thing to notice in this last diagram is that we can only have one client assigned to one partition in the same consumer group. In our example, we only have one consumer. Imagine that we had more spark replicas running at the same time. Kafka
would then assign in an evenly balanced fashion each partition to its matching consumer of the same group. A partition can have multiple consumers assigned to it, just as long as they don’t belong to the same consumer group. Our consumer group in our example is group 0.
Now that we have a good understanding of how we send messages around through our architecture, it’s now time to think about what sort of data format do we want for our data exchanges. Our ER model is relatively complicated, but it’s still important to have a good grasp of it.
What we need to know about this model is a few important things:
For our big data model, we are taking a much simpler approach, because we just want to store data as fast as possible:
There is a lot of code involved in the implementation of the Apache Spark Data Collector
Processes and the implementation of the Spring
Boot Data Collector processes. At this point, I am assuming that you know enough about Spring Boot and Spring
. Based on this we will go through the code implementation related to collecting the data via Apache Spark and RabbitMQ. We call collectors, the Spring-based RabbitMQ
processes and readers, the Apache Spark Processes
Let’s look at the project bl-sensor-data-collector and its dependencies:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>bl-central-server</artifactId>
<groupId>org.jesperancinha.logistics</groupId>
<version>0.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>bl-sensor-data-collector</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.jesperancinha.logistics</groupId>
<artifactId>bl-domain-repository</artifactId>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
</dependency>
<dependency>
<groupId>com.opentable.components</groupId>
<artifactId>otj-pg-embedded</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>${h2.version}</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>${gson.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<version>2.2.0.RELEASE</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<version>${spring-boot-starter-test.version}</version>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<version>${spring-rabbit-test.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-io</artifactId>
<version>${commons-io.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.2.6.RELEASE</version>
<configuration>
<jvmArguments>--enable-preview</jvmArguments>
<mainClass>org.jesperancinha.logistics.sensor.collector.DataCollectorLauncher</mainClass>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
<compilerArgs>--enable-preview</compilerArgs>
</configuration>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<argLine>--enable-preview</argLine>
</configuration>
</plugin>
<plugin>
<artifactId>maven-failsafe-plugin</artifactId>
<configuration>
<argLine>--enable-preview</argLine>
</configuration>
</plugin>
</plugins>
</build>
</project>
Here, we see a very important dependency we need. This is the spring-boot-starter-amqp. With this dependency, we can create configurations for RabbitMQ. It provides a seamless way to configure one rabbitMQ virtual server. In our case, we are using 3. To get around this problem let’s first create a virtual abstract class common to all of these configurations and name it CollectorConfiguration.
public abstract class CollectorConfiguration {
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.host}")
private String host;
protected SimpleMessageListenerContainer getSimpleMessageListenerContainer(MessageListenerAdapter listenerAdapter, String vHost, String queueName) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(vHost);
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(listenerAdapter);
return container;
}
}
In this class, we’ll find the common properties for all our virtual hosts. These are the username and the password. Evidently, we also need to provide the username and password. When this is done, we can then create our implementation. This is the TrainSensorCollectorConfiguration:
@Configuration
@ConditionalOnProperty(name = "bridge.logistics.train.sensor.active",
matchIfMissing = true)
public class TrainSensorCollectorConfiguration extends CollectorConfiguration {
private static final String BL_TRAIN_01_SENSOR_EXCHANGE = "bl-train-01-sensor-exchange";
private static final String BL_TRAIN_01_SENSOR_QUEUE = "bl-train-01-sensor-queue";
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${bridge.logistics.train.sensor.vhost}")
private String vHost;
@Bean(name = "TrainQueue")
Queue queue() {
return new Queue(BL_TRAIN_01_SENSOR_QUEUE, true);
}
@Bean(name = "TrainExchange")
FanoutExchange exchange() {
return new FanoutExchange(BL_TRAIN_01_SENSOR_EXCHANGE, true, false);
}
@Bean(name = "TrainBinding")
Binding binding(
@Qualifier("TrainQueue")
Queue queue,
@Qualifier("TrainExchange")
FanoutExchange exchange) {
return BindingBuilder.bind(queue)
.to(exchange);
}
@Bean(name = "TrainContainer")
SimpleMessageListenerContainer container(
@Qualifier("TrainListener")
MessageListenerAdapter listenerAdapter) {
return getSimpleMessageListenerContainer(listenerAdapter, vHost, BL_TRAIN_01_SENSOR_QUEUE);
}
@Bean(name = "TrainListener")
MessageListenerAdapter listenerAdapter(TrainSensorReceiver trainSensorReceiver) {
return new MessageListenerAdapter(trainSensorReceiver, "receiveMessage");
}
}
The actual demo is implemented with a handful of python scripts. The images at the bottom give us a representation of how this actually works in practice. This is what the demo does. It simulates an environment where a train is running through a bridge and a bus is running through another.
Then, if we have Docker desktop installed and Docker running, we should be able to start our docker-compose environment with the make docker script. We should then get this result:
Now, let’s have a look at how everything will play out when we start running our python demo scripts:
At this point, we already know that Cassandra won’t be running in our docker-compose environment. We also know why. This is also the reason why we have to run our spark process from the outside. At this point, at least 2 minutes should have already passed. In order to start the spark processes, let’s run the script make start-readers. This will start our spark collectors.
Alternatively, we can start these projects via Intellij. It’s easier to debug and if we want to understand everything that’s happening in more detail, maybe this could be a better option. Let’s now wait for these collectors to connect to our Kafka brokers. We know this happens when Spark lets us know that the group has joined and that the partitions have been assigned:
Let’s check that everything is running correctly. First, we go to:http://localhost:15672/#/This is the homepage of the RabbitMQ server located in the central streaming service. We log in with username test and password test. We should get a screen like this:
If we go to Admin, we can check the status of the federations:
Now, let’s try one of the other services. Let’s go to the train service on:http://localhost:15673/#/queues
As we can see, the queues are federated! This means that all messages coming into these queues will be automatically sent to the central server. Now, let’s have a look at what is happening to our Spark cluster. Let’s check the Spark GUI on:http://localhost:4040/jobs/Because we actually have two spark processes running the port allocation will be random.
One GUI will be allocated on port 4040 and another one on port 4041:http://localhost:4040/jobs/Either way, when the jobs are running and we are retrieving our data, we should be getting the following:
We are finally ready to start up our simulation. This is the time to run the make demo script. We let it run until it finishes:
In the end, we are prompted to enter the spy’s name. I will explain how to do this afterward.
In order to get a good feel of the technologies involved in this project, I’ve created a small game that tells a spy story. You are responsible for the fate of classified information being taken by a special agent. The agent sits in a train bound to its destination. When the train crosses the bridge, a secret spy takes the special-agent suitcase and goes to the toilet to try to escape through the window. The special agent tries to chase the spy only to see the spy jump off the window into the river.
The spy, however safely lands on a boat using a parachute. The boat disappears into the horizon. At the root of the project, we’ll find a folder called bl-simulation-data. As we have seen before, this folder contains all the necessary simulation data. In this folder, we find another one called passengers. This is where the material needed to generate the spy case is located. These are the files:
ER
database, in other words, we need to go to our PostgreSQL
database. With IntelliJ
we can easily connect to the database using the following configuration: Now we look into the train_log table. There we filter our results by:check_in_out='CHECKIN' or check_in_out='CHECKOUT'
Let’s keep in mind that carriage 3
has a toilet and carriage 4
doesn’t. As we can see, carriage 3
, with a toilet, has a passenger weight of 1072Kg
during CHECKIN
. During CHECKOUT
the weight goes down to 434Kg
. This means that there has been a difference of 255Kg
. This 255Kg
is calculated as 1072
-817
= 255Kg
. Let’s now have a look at the increase in weight of carriage 3
. This would be 434
–341
= 93Kg
. Immediately there is a discrepancy of weight.
If we lose weight in one carriage and don’t win the same weight in the other, then the difference might just be our special agent’s weight. The weight of the spy is thus 255
–93
= 162Kg
. This means that the spy must have thrown the victim out the window while passing through the bridge. The weight of the spy is thus 162Kg
!Let’s now look at our Cassandra
database. In there, all passengers are being registered with their respective weights upon entering the train. First, we need to configure our client:
Let’s now check table passengers in the readings keyspace. If you check passengers with weight 162Kg, you then find
After investigating the suspect further, a long history and connections tying the suspect to the spy end up proving evidence that the suspect in the list is indeed the spy. If we type that in:
If we fail (Note that I ran the simulation again):
I hope you had success in finding the spy and if you didn’t, then try again. Maybe next time you will save the world!
In this article we have seen how can we work with four fundamental protocols in the worlds of IoT and stream:
We have mastered basic skills on how to get data streaming in the direction we want. We have seen the difference between push and pull mechanisms in action. During our tests, we have seen how RabbitMQ
can efficiently rebound by the use of a federation in case of connection breakups. In the same way, we have seen how Kafka and Spark also very efficiently rebound in such cases.
We have gone through the different ways we can set up Kafka to our advantage in different cases of data load. In extreme cases, there can be small data throughput, but the data chunks can be quite large or there can be a very high throughput, where the data chunks are quite small. Closing quotes. I have placed all the source code of this application on GitHub.
I hope that you have enjoyed this article as much as I enjoyed writing it. Note that this article will be subject to frequent reviews, given how big it has become. If you would like to know more about the code I’ve implemented please let me know in the comments below. The full code explanation will be a reality in future revisions. At the moment only the core of this article is explained along with a portion of the code. Thanks in advance for your help, and thank you for reading!