Asynchronous Programming in Java

Hey folks,

today we’re going to look at the different ways in which you can write Asynchronous code in Java using the different API’s available.

We’re going to be covering:

  • Java 1 — Runnable’s
  • Java 5 — Executors and Futures
  • Java 8 — Completable Futures / Completion Stages

But before we get into it, let’s give ourselves a quick refresh of some definitions around sync / async and tasks.

Synchronous Programming:

  • The main thread executes each task before it moves onto the next

Simple to think and reason about, the steps in which you write your code will be executed in that order.

Asynchronous Programming:

  • A task will be executed at some point in the future

Asynchronous programming can be particularly beneficial to I/O tasks such as reading files from disk or network calls.

These type of operations are not CPU bound which means the main thread does not have to wait for these tasks to complete and can use that valuable CPU time elsewhere.

Tasks:

Tasks in Java are represented by objects in which you can define the steps you want that task to carry out.

Now we’re up to speed let’s dive in!

Creating Tasks in Java:

We can define tasks in Java be creating Runnable’s and Callable’s ( we’ll also touch on Suppliers a little later…)

Runnable Interface ( Java 1 )

  • A function which does not take any parameters and does not return anything

The steps this task carries out will be defined when the run method is overridden.

Creating a Runnable the good old fashioned way:

Creating a Runnable using lambda syntax ( Java 8 onwards )

As we’ll see later in the CompletableFuture API, Lambdas can be used in a variety of methods representing different underlying Objects.

This can be slightly confusing when you see it for the first time, however, having to write less code overall is so satisfying!

Callable Interface ( Java 5 )

Think of a Runnable but it can return a value and it throws checked exceptions:

Creating a Callable:

and again alternatively using Lambda expressions…

Great now we’ve covered how we can create tasks in Java but how can be execute these asynchronously?

Java 1 — Threads

The first method we’re going to cover involves launching tasks in the main Thread:

  • We create a thread object and pass the runnable to the constructor
  • We can tell that thread to run the task by calling the start method

Disadvantages of this approach:

  • obsolete
  • you have to manage the life cycle of the thread yourself
  • thread creation / deletion is expensive
  • you can’t return anything — starting to delve more into the realm of concurrency

Java 5 - Executors / Futures

The Java.util.concurrent API adds Executors.

You can use multiple factory methods of the Executors class to create an Executor Service:

You can think of an Executor service as an abstraction over a pool of threads and a task queue which you can submit your Runnable’s and Callable’s to.

When a thread becomes available it will pick up the next task in the queue to execute and a Future Object will be returned to represent the state of this task.

Futures

What can we do with the Future object returned?

Not too much.. we can

  • cancel the execution of the task
  • check if a task is cancelled
  • check if a task has finished completion
  • get the value returned by the Future

This get() call is blocking so an alternative method would be to poll using the isDone() method, both options here aren’t great.

Let’s see an example:

  • I’ve created a Executor Service using the newSingleThreadExecutor() factory method of the Executors class ( which like the name suggests only creates one Thread within the thread pool of the Executor Service )
  • A Runnable and Callable are both submitted to the executor with the return value of the Callable being captured

One final point to note here is that the executor service needs to be shutdown which can be achieved using the shutdown() or shutdownNow() methods.

Completable Futures

You can think of completable futures as a better Future, it has more methods around chaining new tasks off the completion of earlier tasks ( e.g. callbacks ) making it easier to create pipelines of tasks to be carried out asynchronously.

There are also a number of new methods around Exception handling allowing you to recover throw Exceptions within your pipeline.

This API is pretty similar to Promises within NodeJS.

Hierarchy

The Completable Future class implements both the Future interface previously seen but also a CompletionStage interface and this is were a lot of the newer methods around chaining can be found.

This API can be broken down into four main parts:

  1. Creating your initial Asynchronous task
  2. Chaining off this and creating your pipelines — your callbacks
  3. Completion
  4. Exception Handling

Creating your initial Asynchronous task

RunAsync()

Takes a Runnable as a argument and the type of the Completable Future returned is Void — no value is returned

SupplyAsync()

  • Uses a Supplier instead of a Callable
  • Suppliers are similar to Callable’s but they do not throw checked exceptions

The type of the CompletableFuture returned by the supplyAsync() method will be the same type as the Supplier used as the argument supplied to the method

  • that’s quite a mouthful but hopefully the example clears it up!

You may have noticed in the above examples, we’re not defining which thread or executor service to run these tasks in, do where are they running?

Well it turns out from Java 8 onwards, when the JVM starts up a common fork-join pool is created.

  • this is an executor service
  • it is the default pool of thread used to execute tasks by Completable Futures

Both of these methods have overloaded versions which allow you to specify a specific executor to use instead of the common fork-join one:

Chaining

Let’s now move on to how we can chain tasks of this initial stage and create pipelines.

There are four main methods:

  • thenRun()
  • thenAccept()
  • thenApply()
  • thenCompose()

These all sound really similar so we’re going to create an example pipeline of tasks to dive into each one soon.

There are also async versions of these:

  • thenRunAsync()
  • thenAcceptAsync()
  • thenApplyAsync()
  • thenComposeAsync()

The non-async versions will run synchronously in the thread the previous stage was run in.

The async versions can take an executor as an extra argument allowing you to run different stages in different thread pools ( there is a good amount of flexibility in the API here for better or worse! )

Let’s explore these methods using an example:

Imagine we have two services:

  • A pizza base service with it’s own DB
  • A pizza topping service with it’s own DB

Our pipeline of tasks we want to carry out asynchronously is represented by this diagram

Stage 1: — Get Pizza Bases using PizzaBase Client

Stage 1a: Log that we have the Pizza Bases

To achieve this we’re going to use the thenRun() method.

  • In this part of the pipeline we don’t need the Pizza Bases as an argument and we’re also not returning anything.
  • The thenRun() takes a Runnable as an argument so this fits our needs perfectly

Stage 1b: Map Pizza Bases to Pizza ID’s

Imagine that a Pizza Base is a POJO with an ID — let’s just go with it! 😅

To achieve this we’re going to use the thenApply() method which you can imagine as a Map function ( transform input of PizzaBases to PizzaIds ).

This method takes as Function as an argument e.g.

  • Input of previous stage
  • Output for the next stage

One thing to note here is that we’re now capturing the CompletableFuture returned by this stage.

We use this new CompletableFuture object to chain our next steps in the pipeline off

Stage 2a: Log Pizza ID’s

thenAccept()

  • Takes a consumer as an argument
  • Receives input of the previous stage
  • Does not return anything

Stage 2b: Get Pizza Toppings ( the last part of our pipeline )

thenCompose() — think of as a flatMap instead of a Map

  • We want this stage to return a value of type CompletableFuture<PizzaToppings>
  • However, we’re using the PizzaTopping service client, which is asynchronous and returns a CompletableFuture<V>
  • Therefore instead of having a CompletableFuture<CompletableFuture<V>> type returned, thenCompose() will unbox this to a CompletableFuture<V>

🎉

All together:

Completion — you’ve almost completed this blog post…

We’re going to cover just a few of the completion methods within the API

  • join()
  • getNow(T valueIfAbsent)
  • completeExceptionally(Throwable T)

join()

  • Blocking
  • Similar to get() in Future but checked exceptions are not thrown

This following example would block on the main Thread and print hello to the console:

getNow()

Use an alternative value if Completable Future has not yet completed

In this example:

  • “hello” will be printed to the console if the CompletableFuture has completed before getNow() is executed ( likely scenario here )
  • “Using this instead” will be printed to the console if the CompletableFuture has not completed before getNow() is executed — this would likely be executed if we put a sleep() statement within the Supplier before returning “hello”

completeExceptionally(Throwable T)

  • If not already completed, causes invocations of get() and related methods to throw the given exception
  • Returns true if this invocation caused this CompletableFuture to transition to a completed state, else false

Exception Handling — Almost at the finish line!

Another improvement over the Future API is the state of a CompletableFuture can be one of the following:

  • Running
  • Completed
  • CompletedExceptionally

Any Exception which occurs within a task ( which is not caught in that task ) will transition a CompletableFuture’s state to CompletedExceptionally.

This exception will then propagate downstream to all dependent tasks if nothing is done to prevent this.

Let’s explore some of the options available to us within the API to deal with such a scenario:

  • Exceptionally()
  • Handle()
  • WhenComplete()

Exceptionally()

  • Allows you to recover from an exception in the pipeline
  • You can supply an alternate value, or propagate exception further if you wish to do so:

In this example, the supplyAsync() task will be completed exceptionally, however, the exceptionally() method chaining of this will catch it and supply the value “Robert” instead.

This allows the thenAccept() task to complete successfully.

Handle()

This method takes a BiFunction as an argument:

  • Result and Exception are parameters ( one is null )
  • This method will always be executed regardless of whether an exception occurs or not allowing you to recover or throw an exception downstream

And finally…..

WhenComplete()

This method takes a BiConsumer as an argument:

  • Result and Exception are parameters ( one is null )

This is typically used at the end of the pipeline to determine if all steps completed successfully at which point you could log or update metrics in a service.

There are many more methods within the CompletableFuture API but hopefully this blog post gave a brief insight into the different ways to write asynchronous code using the Java API’s!

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store