Skip to main content

Design for exploiting parallelism

Posted by fabriziogiudici on November 21, 2006 at 2:49 PM PST

Ok, I think I've spent enough time on preliminaries, so this time I'm gonna show you some UML diagrams and code. I also have to introduce you Emmanuele Sordini, one of my best friends and co-author of the Mistral project. Emmanuele is an engineer like me (but he's more on the C++ side) and an amateur photographer like me (but he's more on the astronomic photography) and some months ago told me about some heavy CPU processing that sometimes he has to apply to his photos. You can learn the whole details here, I won't repeat them, let me only sum up that Emmanuele's problem is to run a batch processing over a set of above 1,000 different images of the same subject, aligning them and computing an average.

The overall algorithm is described by this flow chart:

1261_Image_Stacking.png

For simplicity (and also because there's no working code for them yet), I'll skip the parts in dark gray. The idea is that a photo is taken out of the set and chosen as a "reference". Then all the other photos are fed into a math algorithm that computes the misalignment with respect to the reference, they get shifted to counter the mis-aligment and finally summed together. Emmanuele explained me that the most used software applications used by the amateur astrophotographers community usually take around one hour on a standard desktop to process a set of 1,000 images.

When Emmanuele told me about this problem, I was thinking about doing some experiment with parallel architectures, exercising the idea of the "simple approach"; I realized that Emmanuele's problem was the perfect scenario to play with. In July we wrote a quick set of specs:

  1. implement the image stacking in Java;
  2. make it able to work in a standard context (a single desktop) as well as a small, home-built cluster of PCs, and even some facilities such as the Sun Grid;
  3. have it integrated with a standard, desktop application for managing photos

And so the Pleiades project was born - Pleiades is the implementation of the above algorithm, built upon the parallelizing engine of Mistral.

Deciding the granularity

For parallel computing there is a huge gamma of different approaches: parallelizing the code at a very fine grain (e.g. having a for loop to take advantage of threads), partitioning data and feeding them to different threads (e.g. splitting an image in tiles and having each one processed at the same time), and so on. Usually this is not a matter of choosing: specific problems are best approached in a way rather than another. But sometimes you can choose, and probably the case of a large set of images is one of these cases. So I anticipate that the architecture we designed is aimed in exploiting parallelism at a medium-sized grain, for the following reasons:

  1. We're considering also distributed processing on a network, and we have to deal with network latency. It's well-known that network latency doesn't match well with fine-grained operations.
  2. Parallelizing at fine grain is harder, since it requires to master the algorithm and adapting it - often making it more complex than the original. For instance, consider applying a high-pass to an image: this means computing a convolution between two matrixes, the kernel of the filter and the image itself. You can split the image in tiles and work on many tiles at the same time, but you have to do some special processing at tiles boundaries, where the kernel matrix convolution would overlap adjacent tiles.
  3. We want to create a parallelizing architecture on top of an existing imaging library, without changing it. This clearly excludes the option of optimizing the algorithm for fine-grain.

Parallel decomposition

Our medium-sized grain is precisely at the level of the blocks in the previous flow chart. So, the first thing to do is to study the flow and find out where we can introduce parallel sub-flows. The following picture shows a first approach (I kept the same colors of blocks in the previous diagram):

1262_Logical_View.png

Now, while the "Register" operation has a single input and a single output (this means that there's no more parallelism exploitable at this level), the "Add" works on multiple operands. It can be further decomposed as follows:

1263_Logical_View_2.png

where the single multiple-operand "Add" has been replaced by a tree of binary operations. At this point, our parallel decomposition is finished. It's time to design the code.

The Master-Worker pattern

Mistral is based on the Master-Worker pattern. It's a very simple and common pattern for parallel computing, and consists in encapsulating a parallelized, independent unit of work in a single class, named "Task". This means that we will deal with multiple instances of this class that will be scheduled in parallel, each one taken by a different "Worker". Now, if we have a lot of Tasks - say 1,000 - it's likely we won't able to run all of them at the same time, because usually we don't have so many Workers - say 10. So the idea is to create a queue of Tasks that are dispatched to Workers (the dispatcher is named "Master"). Tasks will be extracted out of the queue and consumed by Workers. The more Workers you have, the shorter the computing time; but this approach works, even though slowly, also in a system with a single Worker.

There's some more details to consider now. Saying that all the Tasks can be consumed as quickly as possible is not always true: for instance, there could be dependencies between them. A dependency between Tasks means that the result of a Task is later fed into another Task, and this latter obviously can't be scheduled in parallel with the former. Looking back at our previous diagram, we see that the "Add" tasks can't be scheduled until the "Register" tasks have been completed. This is called "multi-phase" scheduling, and our complete algorithm - see again the previous diagram - is made of 3 + log2(n) phases, where n is the number of images in the batch.

At this point of our discussion, we could imagine a Master which first schedules all the 1,000 "Register" tasks, then the "Add" tasks (from now on, for the sake of simplicity, I'll only consider these two tasks). But why wait for all the "Register" to be completed? Indeed an "Add" can be scheduled as soon as just a pair of results from "Register" are available. So here is another diagram that shows this approach:

1264_Implementation_View.png

There's a queue of "Register" Tasks, and each one is processed by a Worker (imagine that processed tasks move left-to-right in the queue). As soon as two results are available, a new "Add" task is created and placed in its own queue. Also in this second queue (still imagine processed tasks at the right side), as soon as two results are available, a new "Add" is created and put back in the queue. Our job is completed when we have just a single "Add" task in the queue: this is the final result.

This sounds a bit more complex than first scheduling all the "Register" tasks and later the "Add" tasks. Is it worth while? Depending on the context, it is. Consider than each task works on an image, that consumes a lot of memory. Having 1,000 completed tasks means having 1,000 images in memory, and most of them are just idle, waiting for a Worker to process them (of course, we can imagine swapping them on the disk, but this operation is expensive too). On the contrary with the queue approach in the latest diagram there are no more idle images in the memory: as soon there is a pair available, they get fed into another Task and consumed. This is much more efficient. Lesson learned: with a parallel design, don't be conditioned by sequentional reasoning. Think asynchronously instead.

Now we are ready for sketching some pseudo-code:

reference = createReference();

for each image
  {
    schedule new RegisterTask(image, reference);
  }

when (at least 2 RegisterTasks completed)
  {
    image1 = registerTask1.getResult();
    image2 = registerTask2.getResult();
    schedule new AddTask(image1, image2);
  }

when (at least 2 AddTasks completed)
  {
    image1 = addTask1.getResult();
    image2 = addTask2.getResult();
    schedule new AddTask(image1, image2);
  }

// Detects last phase
when (no more pending tasks && only 1 completed AddTask has not been processed)
  {
    sumImage = addTask.getResult();
    normalize(sumImage);
    return;
  }

The ImageProcessor of Mistral

Now, let's have a look at the design of the Mistral image processor:

Mistral Simplified Design.png

Classes in red color are provided by the framework; classes in blue are implemented by the programmer and contain the specific algorithm (Pleiades in our case); classes in green color are polymorphic implementations of the processor (polymorphic Master-Worker makes us able to provide alternate scheduling strategies - one of our aims, being targets multi-core computers, a local cluster or the Sun Grid).

In particular, the "Register" operation will be encapsulated into a RegisterTask class, and he "Add" operation into AddTask. Both are subclasses of ImagingTask, whose relevant code is the following:

package it.tidalwave.image.processor;

import it.tidalwave.image.EditableImage;

abstract public class ImagingTask implements Serializable
  {
    /***************************************************************************
     *
     * Concrete implementations should provide the task core in this method.
     * This method is performed in a distributed context, if available.
     *
     **************************************************************************/
    abstract protected void run()
      throws Exception;
   
    /***************************************************************************
     *
     * Returns the result of this task. Must be implemented by subclasses.
     *
     * @return  the result
     *
     **************************************************************************/
    abstract public EditableImage getResult();
   
    /***************************************************************************
     *
     * Executes the task. This method is only called by the framework.
     *
     **************************************************************************/
    public final void execute()
      {
        try
          {
            logger.info("Starting " + name);
            long time = System.currentTimeMillis();

            try
              {
                run();
              }
            finally
              {
                // ... collect statistics...
              }
           }
        catch (Throwable e)
          {
            throwable = e;
          }
      }
  }

The ImagingTaskProcessorListener, as you would expect, is a listener for events in the ImagingTaskProcessor, most notably events coming from completed Tasks. While it can be useful for mere notification (imagine a graphic GUI showing the progress of the computation), it's fundamentally used for controlling the multi-phase scheduling. This is a sketch from the real code of Pleiades:

    private ImagingTaskProcessorListener listener = new ImagingTaskProcessorAdapter() 
      {
        @Override
        public void notifyTaskCompleted (ImagingTaskProcessorEvent event)
          {
            ImagingTask task = event.getTask();
            int registerTodoCount = imagingTaskProcessor.getPendingTaskCount(RegisterTask.class) +
                                    imagingTaskProcessor.getRunningTaskCount(RegisterTask.class);
            int addTodoCount = imagingTaskProcessor.getPendingTaskCount(AddTask.class) +
                               imagingTaskProcessor.getRunningTaskCount(AddTask.class);
            int registerCompletedCount = imagingTaskProcessor.getCompletedTaskCount(RegisterTask.class);
            int addCompletedCount = imagingTaskProcessor.getCompletedTaskCount(AddTask.class);
           
            for (int i = 0; i < registerCompletedCount / 2; i++)
              {
                scheduleAdd(imagingTaskProcessor.popCompletedTask(RegisterTask.class),
                            imagingTaskProcessor.popCompletedTask(RegisterTask.class));
              }

            for (int i = 0; i < addCompletedCount / 2; i++)
              {
                scheduleAdd(imagingTaskProcessor.popCompletedTask(AddTask.class),
                            imagingTaskProcessor.popCompletedTask(AddTask.class));
              }
            //
            // Detects last phase
            //
            if ((registerTodoCount == 0) && (addTodoCount == 0) && (addCompletedCount == 1))
              {
                //
                // Special case: if an odd number of images was in the batch, a single
                // RegisterTask stays in the queue as it cannot be added with another.
                // Perform a final step adding it with the final AddTask.
                //
                if (registerCompletedCount == 1)
                  {
                    scheduleAdd(imagingTaskProcessor.popCompletedTask(RegisterTask.class), 
                                imagingTaskProcessor.popCompletedTask(AddTask.class));
                  }

                else if (registerCompletedCount == 0)
                  {
                    task = imagingTaskProcessor.popCompletedTask(AddTask.class);

                    synchronized (RegisterCrossCorrelation.this)
                      {
                        sum = task.getResult();
                        RegisterCrossCorrelation.this.notify();
                      }
                   
                    imagingTaskProcessor.getStatistics().dump();
                  }
              }
          }

        private void scheduleAdd (ImagingTask task1, ImagingTask task2)
          {
            logger.info("Now adding " + task1.getName() + " and " + task2.getName());
            AddTask addTask = new AddTask(task1.getResult(), task2.getResult());
            imagingTaskProcessor.postWithPriority(addTask);
          }

I think that the code is easy to read, in any case:

  • "pending tasks" are those enqueued but not scheduled yet;
  • "running tasks" are those being scheduled in this moment;
  • "completed tasks" are those whose scheduling is over;
  • imagingTaskProcessor.post(task) adds another ImagingTask to the queue;
  • imagingTaskProcessor.popCompletedTask(taskClass) pops a task out of the queue of completed tasks.

Default implementation, local processing

The default implementation of the ImagingTaskProcessor is LocalImagingTaskProcessor, whose code is:

package it.tidalwave.image.processor;

public class LocalImagingTaskProcessor extends ImagingTaskProcessor
  {
    private static final String CLASS = LocalImagingTaskProcessor.class.getName();
   
    private static final Logger logger = Logger.getLogger(CLASS);
   
    class PoolThread extends Thread
      {
        public PoolThread (String name)
          {
            super(name); 
            setDaemon(true);
          }
       
        public void run()
          {
            for (;;) // TODO: add a smart way to terminate
              {
                changeFreeWorkerCount(+1);
                ImagingTask task = getNextTask();
                changeFreeWorkerCount(-1);
                logger.fine(Thread.currentThread().getName() + " assigned to " + task.getName());
               
                try
                  {
                    task.prepare(LocalImagingTaskProcessor.this);
                    task.execute();
                  }
                catch (Throwable t) // prevents task errors to disrupt the worker
                  {
                    logger.throwing(CLASS, "run()", t); 
                  }
               
                notifyTaskCompleted(task);
              }
          }
      }
   
    public int getWorkerCount()
      {
        return Math.min(Runtime.getRuntime().availableProcessors(), maxWorkers);
      }
   
    public LocalImagingTaskProcessor()
      {
        for (int i = 0; i < getWorkerCount(); i++)
          {
            new PoolThread("PoolThread #" + i).start();  
          }
      }
  }

The implementation is based on a pool of threads whose size equals the number of processors in the system (via Runtime.getRuntime().availableProcessors()). This means that on a classic, single-core computer all the tasks are be executed sequentially, on my MacBook Pro two tasks are executed at the same time, on a quad-core computer such as a Mac Pro four tasks run together, and so on. This code is able to exploit any number of processors that computer manufacturers will deliver in the following years.

I think it's enough for today. I would like to stress that the abstraction introduced by the ImagingTaskProcessor decouples the code from the underlying implementation. In other words, we don't know where ImagingTasks are scheduled, and this is a powerful aspect of this design. In the next talk I'll introduce alternate implementations which dispatch ImagingTasks to multiple nodes in a network. But while the core design will stay the same, some more complexity will be introduced to address two common fallacies of distributed computing: "latency is zero" and "the bandwidth is infinite". In other words, moving ImagingTasks around the network will have a cost, that should be controlled and minimized; and different cluster of grid architectures will require slightly different approaches. We will also see that Mistral has some simple but effective support for benchmarking, a fundamental good practice to optimize the performance.

See you next time.

Related Topics >>