Jacob Hookom's Blog
Jacob Hookom is a developer with McKesson Medical-Surgical, designing supply management solutions on the web, desktop, and handheld for a multitude of markets. He started consulting at 16 and has since held titles from Information Architect to Product Manager. In his free time, he contributes to Sun's JavaServerFaces RI and Glassfish projects and is an active member of the JavaServerFaces Expert Groups. Recently he started a Java.net project of his own, called 'Facelets', which is a templating framework for JavaServer Faces.
Accelerating Applications with Java 5 Concurrency
Posted by jhook on December 22, 2008 at 11:34 PM | Permalink
| Comments (5)
After spending way too much time on ways to speed up web applications, you'll find that your greatest mileage comes in database optimizations, networking, and reducing the bytes downloaded (an example). Time spent optimizing Java code is seemingly insignificant between waiting on the request, waiting on the database, waiting on the services, and waiting on the response.
There's a catch, of course there is a catch. In between the bookends of the request and response life cycle, there is sometimes opportunity to take linear or sequential waits on external calls and put them in parallel with great success.
A Simple Example
// First Attempt, Takes ~5 Seconds
public List<Output> processSequentially(List<Stuff> request) {
List<Output> response = new ArrayList<Output>(request.size());
for (Stuff s : request) {
Output o = lengthyExternalProcess(s);
response.add(o);
}
return response;
}
// Shared Thread Pool
private ExecutorService pool = Executors.newFixedThreadPool(5);
// Second Attempt, Parallel Takes ~1 Second
public List<Output> processParallel(List<Stuff> request) {
List<Output> response = new ArrayList<Output>(request.size());
CompletionService<Output> cs = new ExecutorCompletionService<Output>(this.pool);
for (final Stuff s : request) {
cs.submit(new Callable<Output>() {
public Output call() throws Exception {
return lengthyExternalProcess(s);
}
});
}
for (int i = 0; i < request.size(); i++) {
response.add(cs.take().get());
}
return response;
}
Taking a Step Back
So we added a few more lines of code and used these types: ExecutorService, Callable, and ExecutorCompletionService, but how did we increase the performance by 5x? By using Callable, you're delegating the expensive processing to whomever invokes call(), not really any different than writing simple event listeners in Swing or JavaScript-- everyone's done that before.
Secondly, we want to run these instances of Callable in parallel by passing them off to a pool of threads. With Java 5, this is blindingly easy by using an ExecutorService and the helpful ExecutorCompletionService which coordinates waiting for all your processing/waiting to be completed in parallel as shown in the example above.
As I'm not making any assurances of being an expert on concurrency after many trials and errors, I highly recommend clicking around in the Java 5 JavaDocs for the java.util.concurrent or searching for anything written by Brian Goetz, including his book, Java Concurrency In Practice.
A More Practical Example and Issue
While the simple example works, it's not yet production ready. Lets say the example method processParallel(List<Stuff> request) is running from a Servlet and we have only 5 threads allocated to our ExecutorService? Twenty users of your application all send in different sized requests of stuff, contending for 5 threads? What happens when the first user needs to use up all of your threads to process their stuff? Essentially that user will starve out any other requests and that's bad.
You've made your code paralleled, but you've also caused synchronization on any requests coming into the servlet as they wait for those 5 threads.
How Not to Solve the Issue
The first idea would be to just instantiate a new ExecutorService with 5 threads within each request. While you can do this, it's not the most efficient approach as it can create out of memory problems among other resource contention issues. You may end up with an overall performance loss without any bounds to the amount of resources your server will quickly eat up.
More practically, lets say your Servlet is using a pool of about 50 threads to parallel request processing. You still don't want a single large request to starve out all 50 threads, maybe you only want it to take 5 threads at most, assuming you expect 10 concurrent requests. Keep in mind we want that 5x improvement in performance!
So how would you take a request for 50 instances of stuff for processing, but only use 5 threads at most from the shared pool? Another approach is to simply create only 5 Callable instances which loop through 10 instances a piece:
// Shared Thread Pool
private ExecutorService pool = Executors.newFixedThreadPool(50);
// Third Attempt, Batch Processing in Parallel
public List<Output> processInBatches(List<Stuff> request) {
List<Output> response = new ArrayList<Output>(request.size());
List<List<Stuff>> batches = // split request into 5 batches
CompletionService<List<Output>> cs = new ExecutorCompletionService<List<Output>>(this.pool);
for (final List<Stuff> batch : batches) {
cs.submit(new Callable<List&tl;Output>>() {
public List<Output> call() throws Exception {
List<Output> some = new ArrayList<Output>();
for (Stuff s : batch) {
some.add(lengthyExternalProcess(s));
}
return some;
}
});
}
for (int i = 0; i < batches.size(); i++) {
response.addAll(cs.take().get());
}
return response;
}
While we've (guaranteed) that a given request only uses 5 threads out of the 50 allocated to the servlet, it's still sequentially processed within each batch created.
Lastly, one might attempt to instantiate an ExecutorService with a BlockingQueue to only allow some number of processes to execute in parallel, but this doesn't behave as expected. Others have covered this issue further as the Java 5 concurrency APIs can be deceptively difficult to navigate at times when you combine shared use within a web container.
I needed something much simpler which didn't up heave much of the foundation of the concurrency API.
A Workable Solution
Instead of batching, since we saw linear improvement by paralleling our process, you'd like to take queue each process individually for the greatest granularity and efficiency. At the same time, you don't want to have 50 individual tasks starve out the shared thread pool.
In the first example, I used a CompletionService which helps to manage a unit of work. You can queue up a series of tasks and then wait for it to release the completed results as their available in a non-sequential manner-- perfect! What if you implemented your own CompletionService with a simple Semaphore to guarantee only 5 tasks are ever processed at a time by threads in the shared pool?
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
* Exactly like ExecutorCompletionService, except uses a
* Semaphore to only permit X tasks to run concurrently
* on the passed Executor
*/
public class BoundedCompletionService<V> implements CompletionService<V> {
private final Semaphore semaphore;
private final Executor executor;
private final BlockingQueue<Future<V>> completionQueue;
// FutureTask to release Semaphore as completed
private class BoundedFuture extends FutureTask {
BoundedFuture(Callable<V> c) { super(c); }
BoundedFuture(Runnable t, V r) { super(t, r); }
protected void done() {
semaphore.release();
completionQueue.add(this);
}
}
public BoundedCompletionService(final Executor executor, int permits) {
this.executor = executor;
this.semaphore = new Semaphore(permits);
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
public Future<V> poll() {
return this.completionQueue.poll();
}
public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
return this.completionQueue.poll(timeout, unit);
}
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
try {
BoundedFuture f = new BoundedFuture(task);
this.semaphore.acquire(); // waits
this.executor.execute(f);
return f;
} catch (InterruptedException e) {
// do nothing
}
return null;
}
public Future<V> submit(Runnable task, V result) {
if (task == null) throw new NullPointerException();
try {
BoundedFuture f = new BoundedFuture(task, result);
this.semaphore.acquire(); // waits
this.executor.execute(f);
return f;
} catch (InterruptedException e) {
// do nothing
}
return null;
}
public Future<V> take() throws InterruptedException {
return this.completionQueue.take();
}
}
Now instead of dealing with ridiculous batching, we can change one line of code in our very first example which will guarantee only one request will use 5 threads at a time, preventing complete starvation and keeping our overall resource usage bounded.
// Shared Thread Pool
private ExecutorService pool = Executors.newFixedThreadPool(50);
// Second Attempt, Parallel Takes ~1 Second
public List<Output> processParallel2(List<Stuff> request) {
List<Output> response = new ArrayList<Output>(request.size());
// bound to only use 5 threads concurrently
CompletionService<Output> cs = new BoundedCompletionService<Output>(this.pool,5);
for (final Stuff s : request) {
cs.submit(new Callable<Output>() {
public Output call() throws Exception {
return lengthyExternalProcess(s);
}
});
}
for (int i = 0; i < request.size(); i++) {
response.add(cs.take().get());
}
return response;
}
Yet Another Workable Solution
One thing that's always bugged me about dealing with external services and dependencies is that they aren't always reliable and can timeout or hang. I'm sure you're familiar with that situation. With the CompletionService, there's no obvious way to appropriately timeout your request processing. I'm open to suggestions on that front, but it did drive me to write another class which essentially meets the same goals as the BoundedCompletionService above without needing the client to loop through the data multiple times as with the CompletionService API.
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class ParallelTask<V> implements Future<Collection<V>> {
// FutureTask to release Semaphore as completed
private class BoundedFuture extends FutureTask<V> {
BoundedFuture(Callable<V> c) { super(c); }
BoundedFuture(Runnable t, V r) { super(t, r); }
protected void done() {
semaphore.release();
completedQueue.add(this);
}
}
private final List<BoundedFuture> submittedQueue;
private final BlockingQueue completedQueue;
private final Semaphore semaphore;
private final Executor executor;
private final int size;
private boolean cancelled = false;
public ParallelTask(Executor exec, Collection<Callable<V>> callable, int permits) {
if (exec == null || callable == null) throw new NullPointerException();
this.executor = exec;
this.semaphore = new Semaphore(permits);
this.size = callable.size();
this.submittedQueue = new ArrayList<BoundedFuture>(size);
this.completedQueue = new LinkedBlockingQueue<BoundedFuture>(size);
for (Callable<V> c : callable) {
this.submittedQueue.add(new BoundedFuture(c));
}
}
public boolean cancel(boolean mayInterruptIfRunning) {
if (this.isDone()) return false;
this.cancelled = true;
for (Future f : this.submittedQueue) {
f.cancel(mayInterruptIfRunning);
}
return this.cancelled;
}
public Collection<V> get() throws InterruptedException, ExecutionException {
Collection<V> result = new ArrayList<V?(this.submittedQueue.size());
boolean done = false;
try {
for (BoundedFuture f : this.submittedQueue) {
if (this.isCancelled()) break;
this.semaphore.acquire();
this.executor.execute(f);
}
for (int i = 0; i < this.size; i++) {
if (this.isCancelled()) break;
result.add(this.completedQueue.take().get());
}
done = true;
} finally {
if (!done) this.cancel(true);
}
return result;
}
public Collection<V> get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
// timeout handling isn't perfect, but it's an attempt to
// replicate the behavior found in AbstractExecutorService
long nanos = unit.toNanos(timeout);
long totalTime = System.nanoTime() + nanos;
boolean done = false;
Collection<V> result = new ArrayList<V>(this.submittedQueue.size());
try {
for (BoundedFuture f : this.submittedQueue) {
if (System.nanoTime() >= totalTime) throw new TimeoutException();
if (this.isCancelled()) break;
this.semaphore.acquire();
this.executor.execute(f);
}
for (int i = 0; i < this.size; i++) {
if (this.isCancelled()) break;
long nowTime = System.nanoTime();
if (nowTime >= totalTime) throw new TimeoutException();
BoundedFuture f = this.completedQueue.poll(totalTime - nowTime, TimeUnit.NANOSECONDS);
if (f == null) throw new TimeoutException();
result.add(f.get());
}
done = true;
} finally {
if (!done) this.cancel(true);
}
return result;
}
public boolean isCancelled() {
return this.cancelled;
}
public boolean isDone() {
return this.completedQueue.size() == this.size;
}
}
By now, you must think that's quite a bit of code, but once it's in your toolkit, you can use a revised example like so:
// Shared Thread Pool
private ExecutorService pool = Executors.newFixedThreadPool(50);
// Using new ParallelTask
public List<Output> processParallelTask(List<Stuff> request) {
Collection<Callable<Output>> tasks = new ArrayList<Callable<Output>>(request.size());
for (final Stuff s : request) {
tasks.add(new Callable<Output>() { // no wait
public Output call() throws Exception {
return lengthyExternalProcess(s);
}
});
}
// timeout after 6 seconds if necessary
return new ParallelTask<Output>(this.pool,tasks,5).get(6, TimeUnit.SECONDS);
}
I could make things even simpler if ParallelTask was able to just append Callable instances directly instead of taking in a Collection, but that's for further investigation.
Conclusion
Take a look at your services and use of legacy systems and operations and to see if there's opportunities to parallel operations for dramatic boosts in speed. With a helper class like BoundedCompletionService or ParallelTask, using the concurrency API in Java 5 becomes a bit more comfortable to use in an web-request deployment.
Again, Brian Goetz's book, Java Concurrency in Practice has some great and practical examples to explore further.
 |