[Java - Synchronization] ExecutorService and Futures



1) Overview

While it is easy to create one or two threads and run them, it becomes a problem when your application requires creating 20 or 30 threads for running tasks concurrently. It won't be exaggerating to say that large multi-threaded applications will have hundreds, if not thousands of threads running simultaneously. So, it makes sense to separate thread creation and management from the rest of the application.

ExecutorService is a framework provided by the JDK which simplifies the execution of tasks in asynchronous mode. Generally speaking, ExecutorService automatically provides a pool of threads and API for assigning tasks to it.

Executors framework helps you with:
  • Thread Creation: Provide methods for creating threads, more specifically a pool of threads, that your application can use to run tasks concurrently.
  • Thread Management: Manage life cycle of the threads in the thread pool. You don’t need to worry about whether the threads in the thread pool are active or busy or dead before submitting a task for execution.
  • Task submission and execution: Executors framework provides methods for submitting tasks for execution in the thread pool, and also gives you the power to decide when the tasks will be executed. For example, You can submit a task to be executed now or schedule them to be executed later or make them execute periodically.

1.1) Implementation

Using factory methods is the easiest way to instantiate an ExecutorService.

1.1.1) newFixedThreadPool

Executor with a max thread limit.
ExecutorService executor = Executors.newFixedThreadPool(10);

1.1.2) newSingleThreadExecutor()

Executor with only 1 thread.
ExecutorService executor = Executors.newSingleThreadExecutor();
You might ask - so why bother using executor if there's only 1 thread? There is an overhead involved with creating a thread before you run it. Using a singleThreadExecutor allows us to reuse the thread when we execute the executor again (we don't need to create a new thread, thus, skipping the overhead of creating one).

1.1.3) newCachedThreadPool

Executor that adds and removes threads to the pool as needed.
ExecutorService executor = Executors.newCachedThreadPool();

1.1.4) ThreadPoolExecutor

Directly instantiating a ThreadPoolExecutor with n threads, a keepAliveTime and LinkedBlockingQueue.
ExecutorService executorService = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());

1.2) Usage

ExecutorService can execute Runnable and Callable tasks.

Tasks can be assigned to the ExecutorService using several methods;  execute(), submit(), invokeAny(), invokeAll().

  • execute(): return method is void and doesn’t not return result of task’s execution or to check the task’s status (is it running or executed).
1
executorService.execute(runnableTask);

  • submit(): submits a Callable or a Runnable task to an ExecutorService and returns a result of type Future.
1
2
Future<String> future =
  executorService.submit(callableTask);
  • invokeAny(): takes a collection of Callable objects. Invoking this method does not return a Future, but returns the result of one of the Callable objects. You have no guarantee about which of the Callable's results you get. Just one of the ones that finish. 
    • If one of the tasks complete (or throws an exception), the rest of the Callable's are cancelled.
ExecutorService executorService = Executors.newSingleThreadExecutor();

Set<Callable<String>> callables = new HashSet<Callable<String>>();

callables.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 1";
    }
});
callables.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 2";
    }
});
callables.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 3";
    }
});

String result = executorService.invokeAny(callables);

System.out.println("result = " + result);

executorService.shutdown();
    • This code example will print out the object returned by one of the Callable's in the given collection. I have tried running it a few times, and the result changes. Sometimes it is "Task 1", sometimes "Task 2" etc.
  • invokeAll(): invokes all of the Callable objects you pass to it in the collection passed as parameter. The invokeAll() returns a list of Future objects via which you can obtain the results of the executions of each Callable.
    • Keep in mind that a task might finish due to an exception, so it may not have "succeeded". There is no way on a Future to tell the difference.
  • 1
    List<Future<String>> futures = executorService.invokeAll(callableTasks);

1.3) Shutdown

In general, the ExecutorService will not be automatically destroyed when there is not task to process. It will stay alive and wait for new work to do. In some cases this is very helpful; for example, if an app needs to process tasks which appear on an irregular basis or the quantity of these tasks is not known at compile time.
On the other hand, an app could reach its end, but it will not be stopped because a waiting ExecutorService will cause the JVM to keep running.
To properly shut down an ExecutorService, we have the shutdown() and shutdownNow()APIs.

  • shutdown() method doesn’t cause an immediate destruction of the ExecutorService. It will make the ExecutorService stop accepting new tasks and shut down after all running threads finish their current work.
1
executorService.shutdown();
  • shutdownNow() method tries to destroy the ExecutorService immediately, but it doesn’t guarantee that all the running threads will be stopped at the same time
    • This method returns a list of tasks which are waiting to be processed. It is up to the developer to decide what to do with these tasks.
1
List<Runnable> notExecutedTasks = executorService.shutDownNow();
  • awaitTermination() is the Oracle-recommended way to shutdown ExecutorService.
    • when executed, ExecutorService will first stop taking new tasks, then wait up to a specified time for all tasks to be completed. If that time expires, the execution is stopped immediately.

1
2
3
4
5
6
7
8
executorService.shutdown();
try {
    if (!executorService.awaitTermination(800, TimeUnit.MILLISECONDS)) {
        executorService.shutdownNow();
    }
} catch (InterruptedException e) {
    executorService.shutdownNow();
}

1.4) Futures

The submit() and invokeAll() methods return an object or a collection of objects of type Future, which allows us to get the result of a task’s execution or to check the task’s status (is it running or executed).

1.4.1) Get Methods

The Future interface provides a special blocking method get() which returns an actual result of the Callable task’s execution or null in the case of Runnable task. Calling the get() method while the task is still running will cause execution to block until the task is properly executed and the result is available.
1
2
3
4
5
6
7
Future<String> future = executorService.submit(callableTask);
String result = null;
try {
    result = future.get();
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

With very long blocking caused by the get() method, an application’s performance can degrade. If the resulting data is not crucial, it is possible to avoid such a problem by using timeouts:
1
String result = future.get(200, TimeUnit.MILLISECONDS);
TimeoutException is thrown if the execution period is longer than specified (in this case 200 milliseconds).

1.4.2) Cancel Method

The Future interface also provides for the cancellation of task (or Callable/Runnable) execution with the cancel()method, and to check the cancellation with isCancelled() method. Note that this does not cancel the thread (explained below).
1
2
boolean canceled = future.cancel(boolean mayInterruptIfRunning);
boolean isCancelled = future.isCancelled();
  • Attempts to cancel the execution of this task. This attempt will fail if:
    • Task has already completed, or
    • Has been cancelled already, or
    • Could not be cancelled for some other reason. 
  • If successful, and this task has not started, then this task will never run.
  • If task already started, then the mayInterruptIfRunning parameter determines if thread executing this task should be interrupted in an attempt to stop the task.
  • In a thread pool situation, this is useful because you are stopping the task (or Callable/Runnable), not the thread, meaning you can reuse the thread for the next task from your work queue.

1.4.3) Other Methods

isDone() method can be used to check if the assigned task is already processed or not.

2) Advanced

2.1) More parameters to consider

When you instantiate a ExecutorService, below are some parameters to consider. If you are using a factory method (Executors.new....), then these are set to the default; unless specified.
  • corePool size: minimum # of threads that should be kept in the pool. 
    • In more details, if corePoolSize = 5, then Jvm will create new threads for the first 5 tasks. Additional tasks (6th task and beyond) will be added to the queue until queue (or workQueue) is full. Once workQueue is full, maxPoolSize comes in the picture.
  • maxPool size: maximum number of works that can be in the pool.
    • In more details, if maxPoolSize = 10, then when the queue is full and a new request/task comes in, Jvm will create up to 10 threads (5 orig + 5 new).
  • workQueue
    • Used to queue up tasks for the available worker threads. 
  • keepAliveTime
    • Time after which an idle thread is eligible for being torn down.
  • threadFactory
    • Usually the defaultThreadFactory is sufficient - some notable things the default factory does is create the thread with normal priority, not daemon threads and with the name format of "pool-<pool#>-thread-<thread#>". 
    • If you want to customize any of these attributes, you should provide your own ThreadFactory implementation. 
    • Another benefit of your own thread factory is to set the thread's uncaught exception handler, which can be useful in combating silent failures.
  • rejectedExecutionHandler
    • You can configure a handler to run when a rejection/exception occurs. These handlers are also called "Policies". 
    • By default, the AbortPolicy is used, which throws a RejectedExceutionException. 
    • You can choose other policies, such as:
      • DiscardPolicy, which simply discards the task silently.
      • CallerRunsPolicy, which executes the task on the calling thread, instead of one of the worker threads.
      • Or any policy you create. 

2.2) ScheduledExecutorService Interface

The ScheduledExecutorService runs tasks after some predefined delay and/or periodically. Once again, the best way to instantiate a ScheduledExecutorService is to use the factory methods of the Executors class.

3) Common Mistakes

  • Keeping an unused ExecutorService alive: remember to shutdown ExecutorService when application closes.
  • Wrong thread-pool capacity while using fixed length thread-pool
    • It is very important to determine how many threads the application will need to execute tasks efficiently. 
    • A thread-pool that is too large will cause unnecessary overhead just to create threads which mostly will be in the waiting mode. 
    • Too few can make an application seem unresponsive because of long waiting periods for tasks in the queue;
  • Calling a Future‘s get() method after task cancellation: An attempt to get the result of an already canceled task will trigger a CancellationException.
  • Unexpectedly-long blocking with Future‘s get() method: Timeouts should be used to avoid unexpected waits.

Resources


Comments

Post a Comment

Popular posts from this blog

[Redis] Redis Cluster vs Redis Sentinel

[Unit Testing] Test Doubles (Stubs, Mocks....etc)

[Node.js] Pending HTTP requests lead to unresponsive nodeJS