The event loop pattern always fascinated me. I found it interesting, useful, and compatible with object-oriented programming. A lot of us know it thanks to Node.js.
For some reason, recently, I have been looking to implement it and experiment with it. And I finally got the opportunity to do so at work.
An event loop doesn’t do too much. We can ask it to start and stop itself. And once started it waits and reacts to events. The events could signal a new message in a queue; an expired timeout and so on. Events trigger an action or a script once they occur.
With this in mind, we can already define these interfaces. They’re evoking the aforesaid behaviors:
interface EventLoop {
void start();
void stop();
}
interface Events {
Optional<Event> next();
}
interface Event {
void trigger(Script script);
}
interface Script {
void run(JsonObject properties, Consumer<Instant> onSuccess, Consumer<Throwable> onFailure);
}
The Script
interface deserves more attention. We can ask it to run
with given properties and with two callbacks. One for success and one for failure. The first one sends the script end time. The second one sends the exception that has occured.
The simplest event loop that comes to mind uses busy waiting:
final class BusyWaitingEventLoop implements EventLoop {
private final Events events;
private final Script script;
private final AtomicBoolean alive;
BusyWaitingEventLoop(final Events events, final Script script) {
this(events, script, new AtomicBoolean(true));
}
BusyWaitingEventLoop(final Events events, final Script script, final AtomicBoolean alive) {
this.events = events;
this.script = script;
this.alive = alive;
}
@Override
public void start() {
while (alive.get()) {
events.next().ifPresent(event -> event.trigger(script));
}
}
@Override
public void stop() {
alive.set(false);
}
}
Once started it checks repeatedly for a new event. Once the event is found, it’ll trigger the script according to its own rules.
But this approach has a major drawback: the event handling blocks the event loop. This means that we can handle only one event at a time.
An approach to resolve the aforesaid issue is to start the event loop two or more times. Each time in a different thread.
So, here is a potential multithreaded event loop implementation exploiting the ExecutorService:
The ExecutorService
responsibility is to run tasks. It can run them in different threads or in a single thread. Java provides both implementations thanks to a bunch of factory methods.
final class MultithreadedEventLoop implements EventLoop {
private final EventLoop origin;
private final Integer nThreads;
private final ExecutorService executorService;
MultithreadedEventLoop(final EventLoop origin, final Integer nThreads, final ExecutorService executorService) {
this.origin = origin;
this.nThreads = nThreads;
this.executorService = executorService;
}
@Override
public void start() {
for (var i = 0; i < nThreads; i++) {
executorService.execute(origin::start);
}
}
@Override
public void stop() {
origin.stop();
shutdownExecutorService();
}
private void shutdownExecutorService() {
// Java specific code
}
}
We can start a MultithreadedEventLoop
with eight threads in this way:
public final class Main {
public static void main(String[] args) {
var nThreads = 8;
new MultithreadedEventLoop(
new BusyWaitingEventLoop(
anEventsObject,
aScriptObject
),
nThreads,
Executors.newFixedThreadPool(nThreads)
);
}
}
Now we can handle eight events in parallels. An improvement, but we can do it even better once we have resolved the main issue: the event loop and the script running in the same thread.
The solution to the aforesaid issue is to run scripts and event loops in different threads. In this way, we can have threads dedicated to the scripts and threads dedicated to the event loop.
We already have a multithreaded event loop but we are missing the other piece: an asynchronous script.
Here is an implementation that exploits the ExecutorService
again:
final class AsyncScript implements Script {
private final Script origin;
private final ExecutorService executorService;
AsyncScript(final Script origin, final ExecutorService executorService) {
this.origin = origin;
this.executorService = executorService;
}
@Override
public void run(final JsonObject properties, final Consumer<Instant> onSuccess, final Consumer<Throwable> onFailure) {
if (!executorService.isShutdown()) {
executorService.execute(() -> origin.run(properties, onSuccess, onFailure));
}
}
}
AsyncScript
is a Script
decorator. Using this, we can turn a synchronous Script
to an asynchronous one.
Indeed, we can easily integrate AsyncScript
in the previous example:
public final class Main {
public static void main(String[] args) {
var eventLoopThreads = 2;
var scriptThreads = 14;
var executorService = Executors.newFixedThreadPool(eventLoopThreads + scriptThreads);
new MultithreadedEventLoop(
new BusyWaitingEventLoop(
anEventsObject,
new AsyncScript(
aScriptObject,
executorService
)
),
eventLoopThreads,
executorService
).start();
}
}
Now we have an ExecutorService
with sixteen threads. Two threads were assigned to the event loop, with fourteen available to the scripts. So the event loop will receive new events in two threads. While the scripts will run in others fourteen threads.
A possible improvement specific to Java is to use generics. Something like:
interface EventLoop {
void start();
void stop();
}
interface Events<I, O> {
Optional<Event<I, O>> next();
}
interface Event<I, O> {
void trigger(Script<I, O> script);
}
interface Script<I, O> {
void run(I in, Consumer<O> onSuccess, Consumer<Throwable> onFailure);
}
In this way we can handle generic input/output in the Script
.
While the AggregateEvents
aggregates Events
objects, in my case, it aggregates multiple Azure Queues Events
that emit AzureQueueMessage
. The latter triggers the scripts with the message body as properties. And on completion they remove themselves from the queue.
We can improve the presented implementation in various ways. For example, we can add more capabilities to the event loop, such as restarting. But, as always, keep in mind the modeled domain. Furthermore there could be more opportunities to catch. For example I can guess a prioritised Events
implementation, or I can guess event loops with different ExecutorService
implementations. In this way, we could have finer-grained control over threads or priorities.
Currently, I’m investigating a broader and more consistent way to apply this pattern.
So, stay tuned and happy looping to you all!