Streaming

Vert.x and the async calls chain

Vertx logo background with desktop computer

Vert.x is a nice polyglot library to write reactive applications. Based on the multi-reactor pattern, the core concept of Vert.x is the event loop that dispatches work/events to handlers. The event loop is single-threaded. But you can have multiple event loops (hence, the multi-reactor pattern).

What you will read about in this post:
– A Classic “Hello World” example with a chain of asynchronous callbacks
– Composition with Futures
– The RxJava Combo

The Vert.x “Golden Rule” is « Don’t block the event loop ».

As a matter of fact, the work dispatched to handlers has to be performed in an asynchronous way. That’s why you will often meet this following piece of code for the handlers:


operation(param1, param2, handler-> {
    if (handler.succeeded()) {
      // do something with the result by calling handler.result()
    } else {
      // do something with the error by calling for instance handler.cause() 
    }
});

With this pattern, you can write asynchronous code in a synchronous style because the handlers are always executed in the same event loop.
If you have some long tasks to perform (e.g. I/O), Vert.x offers APIs to execute them in a non-blocking way (see here).

Sometimes, you want to chain these asynchronous calls. Let’s say you want to retrieve some data from a database, then perform some transformation and then call another service. In this case, you may chain the calls of the handlers to form a callback hell arrow… Fortunately, Vert.x offers nice solutions to handle that.

And now… the traditional “Hello World” example!

To illustrate this, let’s start with a “Hello World” example. Let’s say, we start with a given name that we wish to concatenate with “hello “. This “hello + name” is itself concatenated with “!“. And the resulting string is in its turn concatenated with “I say: “. And all that in an asynchronous way (yes, nothing really exciting but this is for a demo purpose. Please, don’t throw too many tomatoes on the author… ;)).

With Vert.x, we will have something like that:
– a handler that takes a name and concatenates with “Hello
– a handler that takes a string and concatenates with “!
– a handler that takes a string and concatenates with “I say:

The resulting code gives that:


public void sayHello(JsonObject aName, Handler<AsyncResult> aHandler) {
   String name;
   name = aName.getString("name");

   concatWithHello(name, hello -> {
      if (hello.succeeded()) {
          concatWithExclamation(hello.result(), exclamation -> {
             if (exclamation.succeeded()) {
                 concatWithISay(exclamation.result(), aHandler);
              } else {
                  aHandler.handle(Future.failedFuture(hello.cause()));  // <-- see the arrow head?
              }
         });
      } else {
         aHandler.handle(Future.failedFuture(hello.cause()));
      }
   });
}

 private void concatWithHello(String aName, Handler<AsyncResult<String>> aHandler) {
    aHandler.handle(Future.succeededFuture("Hello " + aName));

 }
 
 private void concatWithExclamation(String aHello, Handler<AsyncResult<String>> aHandler) {
    aHandler.handle(Future.succeededFuture(aHello + "!"));

 }

 private void concatWithISay(String aString, Handler<AsyncResult<String>> aHandler) {
   aHandler.handle(Future.succeededFuture("I say: \"" + aString + "\""));

 }

As you can see, we can dive into the callback hell arrow easily. But, fortunately, Vert.x offers three nicer ways to write this!

Way n°1: compose with futures

Back to the Future movieBesides AsyncResult, Vert.x API offers the notion of a Future. A Future is an interface that extends AsyncResult<T> and Handler<AsyncResult<T>>. It is quite similar to CompletableFuture. One of the advantages of Futures is that they are also composable.

In this case, you declare a Future future1. If this future1 succeeds, then you can compose its result with another Future (let’s say future2) that will deal with it. This Future can then either succeed and return asynchronously its results or fails. And so on.

Let’s apply this to our example.


    public void sayHelloWithFuture(JsonObject aName, Handler<AsyncResult> aHandler) {
        String name = aName.getString("name");

        Future future = Future.future();
        future.setHandler(aHandler);

        Future hello = Future.future();
        concatWithHello(name, hello);

        hello
            .compose(v -> {
                Future exclamation = Future.future();

                concatWithExclamation(v, exclamation.completer());
                return exclamation;

            })
            .compose(v -> {
                concatWithISay(v, future.completer());
            }, future);
    }

That’s nicer, isn’t it? Instead of having an ugly arrow of “if”, you get a sequence of operations. (Futures compose reminds me of functions composition in Maths with g o f :)). Quite good. Reading the code is greatly improved: the succession of asynchronous calls can be read in sequence. IMHO, it’s far more readable.

Note: Future has also a nice API if you want to execute Future concurrently and wait that all Futures complete. You can find further information here.

Way n°2: RxJava combo

RxJava Back to the Future

RxJava is an asynchronous library that makes it easy to handle a stream of asynchronous events. It’s based on the Observable pattern.

Vert.x provides an extension to work with RxJava (currently v1.x.x but the support of v2.x.x is coming soon!). The core of this extension is the RxHelper class that helps to wrap RxJava idioms into Vert.x handlers and vice-versa.

Besides that, Vert.x provides also a way to generate RxJava compliant code from a service interface that uses the Vert.x handlers. All you need to do is to add an annotation @ProxyGen (+ an annotation processor in your Maven pom / Gradle file or declares it in your IDE). We recommend you to read the Vert.x service proxies introduction; it gives a clear overview and everything is well documented.

Based on our example, here is a code that would use RxJava. I’ve followed the Vert.x conventions: RxJava methods are prefixed with “rx“.


    public void sayHelloWithRx(JsonObject aName, Handler<AsyncResult> aHandler) {
        Single.just(aName)
              .map(n -> aName.getString("name"))
              .flatMap(name -> rxConcatWithHello(name))
              .flatMap(hello -> rxConcatWithExclamation(hello))
              .flatMap(exclamation -> rxConcatWithISay(exclamation))
              .subscribe(RxHelper.toSubscriber(aHandler));

        // Version with method reference
//        Single.just(aName)
//              .map(n -> aName.getString("name"))
//                .flatMap(this::rxConcatWithHello)
//                .flatMap(this::rxConcatWithExclamation)
//                .flatMap(this::rxConcatWithISay)
//                .subscribe(RxHelper.toSubscriber(aHandler));
    }

    private Single rxConcatWithHello(final String aName) {
        return Single.just("Hello " + aName);
    }

    private Single rxConcatWithExclamation(final String aHello) {
        return Single.just(aHello + "!");
    }

    private Single rxConcatWithISay(final String aExclamation) {
        return Single.just("I say: \"" + aExclamation + "\"");
    }

In the example, the Rx methods have been hand-coded. Once again, if you use the Vert.x @ProxyGen to generate code, you can get rxXXX methods generated for free. One thing we’ve faced in the team is when you wish to use Rx methods within other methods of the implementation of a service generated via the @ProxyGen. It’s a vicious circle: you need to have the Rx generated wrapper of the implementation…

To face this issue you can convert a Vert.x handler into a RxJava Single:


public void sayHelloWithRx2(JsonObject aName, Handler<AsyncResult<String>> aHandler) {
  Single.just(aName)
        .map(n -> aName.getString("name"))
        .flatMap(name -> 
                   Single.create(
                           new SingleOnSubscribeAdapter<>(fut -> concatWithHello(name, fut))))
        .flatMap(hello -> 
                   Single.create(
                           new SingleOnSubscribeAdapter<>(fut -> concatWithExclamation(hello, fut))))
        .flatMap(exclamation -> 
                   Single.create(
                           new SingleOnSubscribeAdapter<>(fut -> concatWithISay(exclamation, fut))))
        .subscribe(RxHelper.toSubscriber(aHandler));
}

… or you can create an ObservableFuture that can be converted to a Handler<AsyncResult<.>> and do this trick too:


public void sayHelloWithRx4(JsonObject aName, Handler<AsyncResult<String>> aHandler) {
    Single.just(aName)
          .map(n -> aName.getString("name"))
          .flatMap(name -> {
              ObservableFuture<String> observableFuture = RxHelper.observableFuture();
              concatWithHello(name, observableFuture.toHandler());
              return observableFuture.toSingle();
          })
          .flatMap(hello -> {
              ObservableFuture<String> observableFuture = RxHelper.observableFuture();
              concatWithExclamation(hello, observableFuture.toHandler());
              return observableFuture.toSingle();
          })
          .flatMap(exclamation -> {
              ObservableFuture<String> observableFuture = RxHelper.observableFuture();              
              concatWithISay(exclamation, observableFuture.toHandler());
              return observableFuture.toSingle();
          })
          .subscribe(RxHelper.toSubscriber(aHandler));
}

… a little bit longer.

Right, these were anecdotic (I mention it just in case…).

If you wish to chain async methods that don’t return a result, you can return a Completable instead of a Single. You can find a complete example here but I guess you grasp the idea.

So, that’s it!

What to conclude? RxJava syntax is also nice, and a bit more compact. At Streamdata.io, we’re quite fond of RxJava. So, we have a preference for this one. But as for lots of things, it’s a question of flavor…

Wait! You tell us 3 ways and just presented 2! Where is the 3rd one? Actually, you can also use CompletableFuture<>. But I haven’t explored this much for several reasons:

– we prefer RxJava 😉
– I have discussed with some Vert.x guys, and they advice to use Future or RxJava for async compositions. So, my laziness warmly heard them 🙂

I hope this helps a bit. The full example can be found on Github.

If you wish to dive a bit more into Vert.x, there is also a nice workshop Vert.x – From zero to (micro)-hero that explores some concepts of this toolkit. (I recommend it :)). Besides the Vert.x website, you can find a great introduction to this library in this free-ebook Building Reactive Microservices in Java from Clément Escoffier or dive in one of these talks on Microservices or check out the presentation material.

Download the white paper navigating the new streaming API landscape.

**Original source: streamdata.io blog