How to implement an event bus using dependency injection?

I am upset every time I see a service calling another service when their business logic is not related to each other. Think about creating an entry in a security audit log or sending an email notification to the user once someone leaves a comment to their blog post. Calling these methods directly from a comment service would lead to a tightly coupled code which is hard to maintain and refactor.

A well-known solution to this problem is using aspect-oriented programming (AOP). However, you are usually limited to the original function arguments or its return value at most, having no access to the intermediate calculations. Before you try to refactor the code, introducing a new method just for a more suitable join point, let me show you another way first.

There is no consensus on the definition of an event bus. Like in hardware, things on a bus are distributed across the whole system, which is what we want to get a loosely coupled code. The message to be distributed is usually packed inside an instance of an event class. One might call it a message bus, but the term “message” usually implies a message broker, which is not the case here. It’s also like the observer pattern, but all the observers subscribe at system startup and unsubscribe at system shutdown – there is no way to change your mind at runtime. In the example above, an event might look like this:

public class CommentEvent {
    private String content;
    private User author;
    private Instant time;
    // ...

    // getters and setters omitted
}

Once an event is created, there are various ways to distribute it. If you are already using Spring, ApplicationEventPublisher might be a good idea:

@Autowired
private ApplicationEventPublisher eventPublisher;

@Transactional
public void publishComment(...) {
    // ...
    CommentEvent event = new CommentEvent(...);
    eventPublisher.publishEvent(event);
    // ...
}

Then, the EventListener annotation can be used to subscribe:

@EventListener
public void onCommentEvent(CommentEvent event) {
    System.out.println("Comment posted");
}

ApplicationEventPublisher supports both synchronous and asynchronous events and as far as I know it works really well in a lot of projects. Even if you’re not using Spring, other libraries, such as Guava, have similar concepts. On the other hand, you usually cannot get the return values of the handlers and, as a consistence, use it with Spring WebFlux. There is also some overhead if performance matters.

The way I prefer to implement an event bus is by calling the subscribers directly. A subscriber of this event might look like this:

public interface CommentEventListener {
    void onCommentEvent(CommentEvent event);
    // or void onCommentEvent(User user, ...);
}

As the famous joke says, naming is one of the two hard problems in computer science (another one is cache invalidation). I’ve tried multiple names such as Callback, Handler, Observer, Subscriber and even Hook, but Listener appears to be most natural to me.

Now, we need a simple way to get the list of subscribers and here is where dependency injection comes into play. Most DI-frameworks, including Spring, Guice and Dagger, support so-called multibindings – the possibility to inject multiple definitions as a single collection. Here is how it works in Spring:

@Autowired
private List<CommentEventListener> commentListeners;

@Transactional
public void publishComment(...) {
    // ...
    CommentEvent event = new CommentEvent(...);
    commentListeners.stream().map(i -> i.onCommentEvent(event));
    // ...
}

Pretty easy, isn’t it? Here is what a simple subscriber might look like:

@Service
public class DemoListener implements CommentEventListener {
    public void onCommentEvent(CommentEvent event) {
        System.out.println("Comment posted");
    }
}

That’s it! What are the benefits? Firstly, you can handle the return values. For instance, one of the subscribers could block the further execution by returning false (or the other way around):

if (commentListeners.stream().anyMatch(i -> !i.onCommentEvent(event))) {
    System.err.println("Execution failed");
}

Of course, the same could be archived using exceptions, but what about a voting behavior, where individual subscribers can contribute to the final decision, returning positive and negative numbers?

if (commentListeners.stream().reduce(0, (res, i) -> i.onCommentEvent(event) + res) < 0) {
    System.err.println("Execution failed");
}

If you’re using Spring WebFlux, you can benefit from returning reactive streams too. Rewrite the subscriber interface as follows:

import reactor.core.publisher.Mono;

public interface CommentEventListener {
    Mono<Void> onCommentEvent(CommentEvent event);
}

Now, call it from the service:

public Mono<Void> publishComment(...) {
    // ...
    CommentEvent event = new CommentEvent(...);
    return Mono.when(
        commentListeners
            .stream()
            .map(i -> i.onCommentEvent(event))
            .collect(Collectors.toList()));
}

The only disadvantage is, it doesn’t work with third-party code. For instance, you cannot receive Spring Security events without an adapter.