How to run asynchronous task in Java and don't get into trouble.

How to run asynchronous task in Java and don't get into trouble.

CompletableFuture etc.

·

9 min read

I would like to turn Your attention to running asynchronous tasks in Java today and I will use a real example from my job for that. Using CompletableFuture class is one of convenient way to do that in Java so it's nothing extraordinary that I tried to use it in my feature which I was working on not so long ago. That was a part of streaming system built on top of Apache Pulsar. Service was processing tens of thousands messages per second. I tried to measure and record some metrics for each message without affecting the performance of whole functionality. I was using also callback methods for acknowledging processed messages or reporting failures. Everything was working really good until I deployed it to the kubernetes cluster. It dramatically slowed down on that environment. I ran few tests locally and it looked that the code performed well but for some reason it behaves much worse in the cloud. After short investigation, I realized that it's because of CompletableFuture.runAsync(Runnable). The problem was not in the processing the task but with the passing it to the execution. When I took a look on the source code I realized what's the deal. Btw, the answer was of course also in the documentation. So there are two versions of method runAsync. One which accepts Runnable as a parameter and the second one which accepts also Executor. It will be nothing surprising when I will say that the executor in the second version is used to run the submitted task. So how will be ran the task in the first version of runAsync method?

  public static CompletableFuture<Void> runAsync(Runnable var0) {
        return asyncRunStage(asyncPool, var0);
    }

There will be invoked some another method named asyncRunStage with our task and some default executor passed as an arguments. I will not paste the code of that function. The clue is that this function will run our task using that executor. So what kind of executor it is? Let's take a look for the initialization of that field and find it out.

asyncPool = (Executor)(useCommonPool ? ForkJoinPool.commonPool() : new CompletableFuture.ThreadPerTaskExecutor());

There are two possible implementations of executor which can be assigned to that field. Dependent to the useCommonPool field it can be commonPool from ForkJoinPool or some internal implementation of executor named ThreadPerTaskExecutor.

So what is the value of useCommonPool field then?

private static final boolean useCommonPool = ForkJoinPool.getCommonPoolParallelism() > 1;

It's related to the commonPoolParallelism property of ForkJoinPool and means the number of available CPU's. On the Kubernetes the pod had only one CPU available so there was ThreadPerTaskExecutor used to run my tasks. As the name of that executor suggests it will create the new thread for each submitted task. It's really expensive operation. That's why the solution worked well locally and much worse in the cloud.

So it's the time to write some code and check what's the difference between running asynchronous tasks in various ways.

We will make simple Docker image to check how it will perform depending on the number of CPU's.

We will use alpine with the Java 8 as the base for our image.

FROM openjdk:8-alpine

CMD java -cp /app/java/main dev.code_case.asynchronous_execution.AsynchronousExecution

We will check few ways to run the asynchronous task.

package dev.code_case.asynchronous_execution;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.function.Consumer;

public class AsynchronousExecution {
    private static final ThreadFactory factory = runnable -> {
        Thread thread = new Thread();
        thread.setDaemon(true);
        return thread;
    };

    public static void main(String[] args) {
        runCompletableFuture();
        runCompletableFutureWithExecutor();
        runSingleThreadPool();
        runCachedThreadPool();
    }

    private static void runCachedThreadPool() {
        ExecutorService executorService = Executors.newCachedThreadPool(factory);
        runAsyncTasks("CachedThreadPool", executorService::execute);
    }

    private static void runSingleThreadPool() {
        ExecutorService executorService = Executors.newSingleThreadExecutor(factory);
        runAsyncTasks("SingleThreadExecutor", executorService::execute);
    }

    private static void runCompletableFutureWithExecutor() {
        ExecutorService executorService = Executors.newFixedThreadPool(1, factory);
        runAsyncTasks("CompletableFuture with Executor", runnable -> CompletableFuture.runAsync(runnable, executorService));
    }

    private static void runCompletableFuture() {
        runAsyncTasks("CompletableFuture", CompletableFuture::runAsync);
    }

    private static void runAsyncTasks(String consumerStrategy, Consumer<Runnable> consumer) {
        long start = System.currentTimeMillis();
        for (int i = 0; i < 100000; i++) {
            consumer.accept(() -> {
                return;
            });
        }
        System.out.println(String.format("Execution time for %s is %d ms", consumerStrategy, (System.currentTimeMillis() - start)));
    }
}

We can build our image now.

docker build src/main/resources/ -t asyncRun

My Dockerfile is placed in the src/main/resources

So the moment of truth has come.

docker run -d --mount type=bind,source="$(pwd)"/build/classes,target=/app --cpus 1 asyncRun:latest | xargs sh -c 'docker attach $0;docker inspect $0 | grep NanoCpus'

I used Gradle so I need to bind the /build/classes directory as a volume to provide the class for execution. At the first run I will give one CPU for container running our image. I will run it detached and then grep the NanoCpus value from inspect command for the running container to be sure that there is proper CPU's number assigned and attach to the running container to read results of our code.

Execution time for CompletableFuture is 47312 ms
Execution time for CompletableFuture with Executor is 70 ms
Execution time for SingleThreadExecutor is 14 ms
Execution time for CachedThreadPool is 46874 ms
            "NanoCpus": 1000000000

As You can see, executing CompletableFuture.runAsync(Runnable) is 675 times slower than CompletableFuture.runAsync(Runnable, Executor) for 100000 tasks. There is another interesting fact worth to mention. Running that tasks by CachedThreadPool is also much slower than for example SingleThreadExecutor. It shouldn't be surprising when we will remind that this executor is starting a new thread each time when we try to run some task and there is no free worker.

OK. So let's try to run that with 4 CPU's available for the container.

docker run -d --mount type=bind,source="$(pwd)"/build/classes,target=/app --cpus 4 asyncRun:latest | xargs sh -c 'docker attach $0;docker inspect $0 | grep NanoCpus'

And here is the result:

Execution time for CompletableFuture is 44 ms
Execution time for CompletableFuture with Executor is 28 ms
Execution time for SingleThreadExecutor is 19 ms
Execution time for CachedThreadPool is 4948 ms
            "NanoCpus": 4000000000,

It's much better, CompletableFuture is using ForkJoinPool to run our tasks when we are using CompletableFuture.runAsync(Runnable) in that case. It's still a little bit slower than approach with supplying e.g. FixedThreadPool as an executor to runAsync but it has at least the same complexity. Using the CachedThreadPool is still much slower of course as it still creates many threads.

I'm not saying that there is one and only right way to run asynchronous tasks, but I think that it's worth to know such details about that how various implementations can behave.

Thank You for reading this post and I hope that You enjoy it.