The Source for Java Technology Collaboration
User: Password:



Jacob Hookom's Blog

Jacob Hookom Jacob Hookom is a developer with McKesson Medical-Surgical, designing supply management solutions on the web, desktop, and handheld for a multitude of markets. He started consulting at 16 and has since held titles from Information Architect to Product Manager. In his free time, he contributes to Sun's JavaServerFaces RI and Glassfish projects and is an active member of the JavaServerFaces Expert Groups. Recently he started a Java.net project of his own, called 'Facelets', which is a templating framework for JavaServer Faces.



Accelerating Applications with Java 5 Concurrency

Posted by jhook on December 22, 2008 at 11:34 PM | Permalink | Comments (5)

After spending way too much time on ways to speed up web applications, you'll find that your greatest mileage comes in database optimizations, networking, and reducing the bytes downloaded (an example). Time spent optimizing Java code is seemingly insignificant between waiting on the request, waiting on the database, waiting on the services, and waiting on the response.

There's a catch, of course there is a catch. In between the bookends of the request and response life cycle, there is sometimes opportunity to take linear or sequential waits on external calls and put them in parallel with great success.

A Simple Example

// First Attempt, Takes ~5 Seconds 
public List<Output> processSequentially(List<Stuff> request) {
  List<Output> response = new ArrayList<Output>(request.size());
  for (Stuff s : request) {
     Output o = lengthyExternalProcess(s);
     response.add(o);
  }
  return response;
}

// Shared Thread Pool
private ExecutorService pool = Executors.newFixedThreadPool(5);

// Second Attempt, Parallel Takes ~1 Second
public List<Output> processParallel(List<Stuff> request) {
  List<Output> response = new ArrayList<Output>(request.size());
  CompletionService<Output> cs = new ExecutorCompletionService<Output>(this.pool);
  for (final Stuff s : request) {
    cs.submit(new Callable<Output>() {
      public Output call() throws Exception {
        return lengthyExternalProcess(s);
      }
    });
  }
  for (int i = 0; i < request.size(); i++) {
    response.add(cs.take().get());
  }
  return response;
}

Taking a Step Back

So we added a few more lines of code and used these types: ExecutorService, Callable, and ExecutorCompletionService, but how did we increase the performance by 5x? By using Callable, you're delegating the expensive processing to whomever invokes call(), not really any different than writing simple event listeners in Swing or JavaScript-- everyone's done that before.

Secondly, we want to run these instances of Callable in parallel by passing them off to a pool of threads. With Java 5, this is blindingly easy by using an ExecutorService and the helpful ExecutorCompletionService which coordinates waiting for all your processing/waiting to be completed in parallel as shown in the example above.

As I'm not making any assurances of being an expert on concurrency after many trials and errors, I highly recommend clicking around in the Java 5 JavaDocs for the java.util.concurrent or searching for anything written by Brian Goetz, including his book, Java Concurrency In Practice.

A More Practical Example and Issue

While the simple example works, it's not yet production ready. Lets say the example method processParallel(List<Stuff> request) is running from a Servlet and we have only 5 threads allocated to our ExecutorService? Twenty users of your application all send in different sized requests of stuff, contending for 5 threads? What happens when the first user needs to use up all of your threads to process their stuff? Essentially that user will starve out any other requests and that's bad.

You've made your code paralleled, but you've also caused synchronization on any requests coming into the servlet as they wait for those 5 threads.

How Not to Solve the Issue

The first idea would be to just instantiate a new ExecutorService with 5 threads within each request. While you can do this, it's not the most efficient approach as it can create out of memory problems among other resource contention issues. You may end up with an overall performance loss without any bounds to the amount of resources your server will quickly eat up.

More practically, lets say your Servlet is using a pool of about 50 threads to parallel request processing. You still don't want a single large request to starve out all 50 threads, maybe you only want it to take 5 threads at most, assuming you expect 10 concurrent requests. Keep in mind we want that 5x improvement in performance!

So how would you take a request for 50 instances of stuff for processing, but only use 5 threads at most from the shared pool? Another approach is to simply create only 5 Callable instances which loop through 10 instances a piece:

// Shared Thread Pool
private ExecutorService pool = Executors.newFixedThreadPool(50);

// Third Attempt, Batch Processing in Parallel
public List<Output> processInBatches(List<Stuff> request) {
  List<Output> response = new ArrayList<Output>(request.size());
  List<List<Stuff>> batches = // split request into 5 batches
  CompletionService<List<Output>> cs = new ExecutorCompletionService<List<Output>>(this.pool);
  for (final List<Stuff> batch : batches) {
    cs.submit(new Callable<List&tl;Output>>() {
      public List<Output> call() throws Exception {
        List<Output> some = new ArrayList<Output>();
        for (Stuff s : batch) {
          some.add(lengthyExternalProcess(s));
        }
        return some;
      }
    });
  }
  for (int i = 0; i < batches.size(); i++) {
    response.addAll(cs.take().get());
  }
  return response;
}

While we've (guaranteed) that a given request only uses 5 threads out of the 50 allocated to the servlet, it's still sequentially processed within each batch created.

Lastly, one might attempt to instantiate an ExecutorService with a BlockingQueue to only allow some number of processes to execute in parallel, but this doesn't behave as expected. Others have covered this issue further as the Java 5 concurrency APIs can be deceptively difficult to navigate at times when you combine shared use within a web container.

I needed something much simpler which didn't up heave much of the foundation of the concurrency API.

A Workable Solution

Instead of batching, since we saw linear improvement by paralleling our process, you'd like to take queue each process individually for the greatest granularity and efficiency. At the same time, you don't want to have 50 individual tasks starve out the shared thread pool.

In the first example, I used a CompletionService which helps to manage a unit of work. You can queue up a series of tasks and then wait for it to release the completed results as their available in a non-sequential manner-- perfect! What if you implemented your own CompletionService with a simple Semaphore to guarantee only 5 tasks are ever processed at a time by threads in the shared pool?

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**
 * Exactly like ExecutorCompletionService, except uses a
 * Semaphore to only permit X tasks to run concurrently
 * on the passed Executor
 */
public class BoundedCompletionService<V> implements CompletionService<V> {
	
  private final Semaphore semaphore;
  private final Executor executor;
  private final BlockingQueue<Future<V>> completionQueue;
	
  // FutureTask to release Semaphore as completed
  private class BoundedFuture extends FutureTask {
        BoundedFuture(Callable<V> c) { super(c); }
        BoundedFuture(Runnable t, V r) { super(t, r); }
        protected void done() {
        	semaphore.release();
        	completionQueue.add(this);
        }
    }
    
    public BoundedCompletionService(final Executor executor, int permits) {
      this.executor = executor;
      this.semaphore = new Semaphore(permits);
      this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }

    public Future<V> poll() {
      return this.completionQueue.poll();
    }

    public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
      return this.completionQueue.poll(timeout, unit);
    }

    public Future<V> submit(Callable<V> task)  {
      if (task == null) throw new NullPointerException();
      try {
        BoundedFuture f = new BoundedFuture(task);
        this.semaphore.acquire(); // waits
        this.executor.execute(f);
	return f;
      } catch (InterruptedException e) {
        // do nothing
      }
      return null;
    }

    public Future<V> submit(Runnable task, V result) {
      if (task == null) throw new NullPointerException();
      try {
        BoundedFuture f = new BoundedFuture(task, result);
        this.semaphore.acquire(); // waits
        this.executor.execute(f);
        return f;
      } catch (InterruptedException e) {
        // do nothing
      }
      return null;
    }

    public Future<V> take() throws InterruptedException {
      return this.completionQueue.take();
    }
}

Now instead of dealing with ridiculous batching, we can change one line of code in our very first example which will guarantee only one request will use 5 threads at a time, preventing complete starvation and keeping our overall resource usage bounded.

// Shared Thread Pool
private ExecutorService pool = Executors.newFixedThreadPool(50);

// Second Attempt, Parallel Takes ~1 Second
public List<Output> processParallel2(List<Stuff> request) {
  List<Output> response = new ArrayList<Output>(request.size());
  // bound to only use 5 threads concurrently
  CompletionService<Output> cs = new BoundedCompletionService<Output>(this.pool,5);
  for (final Stuff s : request) {
    cs.submit(new Callable<Output>() {
      public Output call() throws Exception {
        return lengthyExternalProcess(s);
      }
    });
  }
  for (int i = 0; i < request.size(); i++) {
    response.add(cs.take().get());
  }
  return response;
}

Yet Another Workable Solution

One thing that's always bugged me about dealing with external services and dependencies is that they aren't always reliable and can timeout or hang. I'm sure you're familiar with that situation. With the CompletionService, there's no obvious way to appropriately timeout your request processing. I'm open to suggestions on that front, but it did drive me to write another class which essentially meets the same goals as the BoundedCompletionService above without needing the client to loop through the data multiple times as with the CompletionService API.

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class ParallelTask<V> implements Future<Collection<V>> {

  // FutureTask to release Semaphore as completed
  private class BoundedFuture extends FutureTask<V> {
    BoundedFuture(Callable<V> c) { super(c); }
    BoundedFuture(Runnable t, V r) { super(t, r); }
    protected void done() {
      semaphore.release();
      completedQueue.add(this);
    }
  }

  private final List<BoundedFuture> submittedQueue;
  private final BlockingQueue completedQueue;
  private final Semaphore semaphore;
  private final Executor executor;
  private final int size;
  private boolean cancelled = false;

  public ParallelTask(Executor exec, Collection<Callable<V>> callable, int permits) {
    if (exec == null || callable == null) throw new NullPointerException();
		
    this.executor = exec;
    this.semaphore = new Semaphore(permits);
    this.size = callable.size();
    this.submittedQueue = new ArrayList<BoundedFuture>(size);
    this.completedQueue = new LinkedBlockingQueue<BoundedFuture>(size);
    for (Callable<V> c : callable) {
      this.submittedQueue.add(new BoundedFuture(c));
    }
  }

  public boolean cancel(boolean mayInterruptIfRunning) {
    if (this.isDone()) return false;
    this.cancelled = true;
    for (Future f : this.submittedQueue) {
      f.cancel(mayInterruptIfRunning);
    }
    return this.cancelled;
  }

  public Collection<V> get() throws InterruptedException, ExecutionException {
    Collection<V> result = new ArrayList<V?(this.submittedQueue.size());
    boolean done = false;
    try {
      for (BoundedFuture f : this.submittedQueue) {
        if (this.isCancelled()) break;
        this.semaphore.acquire();
        this.executor.execute(f);
      }
      for (int i = 0; i < this.size; i++) {
        if (this.isCancelled()) break;
        result.add(this.completedQueue.take().get());
      }
      done = true;
    } finally {
      if (!done) this.cancel(true);
    }
    return result;
  }

  public Collection<V> get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    // timeout handling isn't perfect, but it's an attempt to 
    // replicate the behavior found in AbstractExecutorService
    long nanos = unit.toNanos(timeout);
    long totalTime = System.nanoTime() + nanos;
    boolean done = false;
    Collection<V> result = new ArrayList<V>(this.submittedQueue.size());
    try {
      for (BoundedFuture f : this.submittedQueue) {
        if (System.nanoTime() >= totalTime) throw new TimeoutException();
        if (this.isCancelled()) break;
        this.semaphore.acquire();
        this.executor.execute(f);
      }
      for (int i = 0; i < this.size; i++) {
        if (this.isCancelled()) break;
        long nowTime = System.nanoTime();
        if (nowTime >= totalTime) throw new TimeoutException();
        BoundedFuture f = this.completedQueue.poll(totalTime - nowTime, TimeUnit.NANOSECONDS);
        if (f == null) throw new TimeoutException();
        result.add(f.get());
      }
      done = true;
    } finally {
      if (!done) this.cancel(true);
    }
    return result;
  }

  public boolean isCancelled() {
    return this.cancelled;
  }

  public boolean isDone() {
    return this.completedQueue.size() == this.size;
  }
}

By now, you must think that's quite a bit of code, but once it's in your toolkit, you can use a revised example like so:

// Shared Thread Pool
private ExecutorService pool = Executors.newFixedThreadPool(50);

// Using new ParallelTask
public List<Output> processParallelTask(List<Stuff> request) {
  Collection<Callable<Output>> tasks = new ArrayList<Callable<Output>>(request.size());
  for (final Stuff s : request) {
    tasks.add(new Callable<Output>() { // no wait
      public Output call() throws Exception {
        return lengthyExternalProcess(s);
      }
    });
  }
  // timeout after 6 seconds if necessary
  return new ParallelTask<Output>(this.pool,tasks,5).get(6, TimeUnit.SECONDS);
}

I could make things even simpler if ParallelTask was able to just append Callable instances directly instead of taking in a Collection, but that's for further investigation.

Conclusion

Take a look at your services and use of legacy systems and operations and to see if there's opportunities to parallel operations for dramatic boosts in speed. With a helper class like BoundedCompletionService or ParallelTask, using the concurrency API in Java 5 becomes a bit more comfortable to use in an web-request deployment.

Again, Brian Goetz's book, Java Concurrency in Practice has some great and practical examples to explore further.



March 2009
Sun Mon Tue Wed Thu Fri Sat
1 2 3 4 5 6 7
8 9 10 11 12 13 14
15 16 17 18 19 20 21
22 23 24 25 26 27 28
29 30 31        


Search this blog:
  

Categories
Accessibility
Community
Community: Java Enterprise
Community: JavaDesktop
J2EE
J2SE
Patterns
Archives

December 2008
August 2008
October 2007
May 2007
April 2007
February 2007
January 2007
December 2006
November 2006
October 2006
July 2006
May 2006
April 2006
March 2006
February 2006
January 2006
December 2005
September 2005
August 2005
July 2005

Recent Entries

Accelerating Applications with Java 5 Concurrency

JavaFX and You

RIA - Why can't any of them work?



Powered by
Movable Type 3.01D


 Feed java.net RSS Feeds