|
|
||
Fabrizio Giudici's BlogDesign for exploiting parallelismPosted by fabriziogiudici on November 21, 2006 at 02:49 PM | Comments (4)Ok, I think I've spent enough time on preliminaries, so this time I'm gonna show you some UML diagrams and code. I also have to introduce you Emmanuele Sordini, one of my best friends and co-author of the Mistral project. Emmanuele is an engineer like me (but he's more on the C++ side) and an amateur photographer like me (but he's more on the astronomic photography) and some months ago told me about some heavy CPU processing that sometimes he has to apply to his photos. You can learn the whole details here, I won't repeat them, let me only sum up that Emmanuele's problem is to run a batch processing over a set of above 1,000 different images of the same subject, aligning them and computing an average. The overall algorithm is described by this flow chart:
For simplicity (and also because there's no working code for them yet), I'll skip the parts in dark gray. The idea is that a photo is taken out of the set and chosen as a "reference". Then all the other photos are fed into a math algorithm that computes the misalignment with respect to the reference, they get shifted to counter the mis-aligment and finally summed together. Emmanuele explained me that the most used software applications used by the amateur astrophotographers community usually take around one hour on a standard desktop to process a set of 1,000 images. When Emmanuele told me about this problem, I was thinking about doing some experiment with parallel architectures, exercising the idea of the "simple approach"; I realized that Emmanuele's problem was the perfect scenario to play with. In July we wrote a quick set of specs:
And so the Pleiades project was born - Pleiades is the implementation of the above algorithm, built upon the parallelizing engine of Mistral. Deciding the granularity For parallel computing there is a huge gamma of different approaches: parallelizing the code at a very fine grain (e.g. having a for loop to take advantage of threads), partitioning data and feeding them to different threads (e.g. splitting an image in tiles and having each one processed at the same time), and so on. Usually this is not a matter of choosing: specific problems are best approached in a way rather than another. But sometimes you can choose, and probably the case of a large set of images is one of these cases. So I anticipate that the architecture we designed is aimed in exploiting parallelism at a medium-sized grain, for the following reasons:
Parallel decomposition Our medium-sized grain is precisely at the level of the blocks in the previous flow chart. So, the first thing to do is to study the flow and find out where we can introduce parallel sub-flows. The following picture shows a first approach (I kept the same colors of blocks in the previous diagram):
where the single multiple-operand "Add" has been replaced by a tree of binary operations. At this point, our parallel decomposition is finished. It's time to design the code. The Master-Worker pattern Mistral is based on the Master-Worker pattern. It's a very simple and common pattern for parallel computing, and consists in encapsulating a parallelized, independent unit of work in a single class, named "Task". This means that we will deal with multiple instances of this class that will be scheduled in parallel, each one taken by a different "Worker". Now, if we have a lot of Tasks - say 1,000 - it's likely we won't able to run all of them at the same time, because usually we don't have so many Workers - say 10. So the idea is to create a queue of Tasks that are dispatched to Workers (the dispatcher is named "Master"). Tasks will be extracted out of the queue and consumed by Workers. The more Workers you have, the shorter the computing time; but this approach works, even though slowly, also in a system with a single Worker. There's some more details to consider now. Saying that all the Tasks can be consumed as quickly as possible is not always true: for instance, there could be dependencies between them. A dependency between Tasks means that the result of a Task is later fed into another Task, and this latter obviously can't be scheduled in parallel with the former. Looking back at our previous diagram, we see that the "Add" tasks can't be scheduled until the "Register" tasks have been completed. This is called "multi-phase" scheduling, and our complete algorithm - see again the previous diagram - is made of 3 + log2(n) phases, where n is the number of images in the batch. At this point of our discussion, we could imagine a Master which first schedules all the 1,000 "Register" tasks, then the "Add" tasks (from now on, for the sake of simplicity, I'll only consider these two tasks). But why wait for all the "Register" to be completed? Indeed an "Add" can be scheduled as soon as just a pair of results from "Register" are available. So here is another diagram that shows this approach:
This sounds a bit more complex than first scheduling all the "Register" tasks and later the "Add" tasks. Is it worth while? Depending on the context, it is. Consider than each task works on an image, that consumes a lot of memory. Having 1,000 completed tasks means having 1,000 images in memory, and most of them are just idle, waiting for a Worker to process them (of course, we can imagine swapping them on the disk, but this operation is expensive too). On the contrary with the queue approach in the latest diagram there are no more idle images in the memory: as soon there is a pair available, they get fed into another Task and consumed. This is much more efficient. Lesson learned: with a parallel design, don't be conditioned by sequentional reasoning. Think asynchronously instead. Now we are ready for sketching some pseudo-code:
reference = createReference();
for each image
{
schedule new RegisterTask(image, reference);
}
when (at least 2 RegisterTasks completed)
{
image1 = registerTask1.getResult();
image2 = registerTask2.getResult();
schedule new AddTask(image1, image2);
}
when (at least 2 AddTasks completed)
{
image1 = addTask1.getResult();
image2 = addTask2.getResult();
schedule new AddTask(image1, image2);
}
// Detects last phase
when (no more pending tasks && only 1 completed AddTask has not been processed)
{
sumImage = addTask.getResult();
normalize(sumImage);
return;
}
The ImageProcessor of Mistral Now, let's have a look at the design of the Mistral image processor:
Classes in red color are provided by the framework; classes in blue are implemented by the programmer and contain the specific algorithm (Pleiades in our case); classes in green color are polymorphic implementations of the processor (polymorphic Master-Worker makes us able to provide alternate scheduling strategies - one of our aims, being targets multi-core computers, a local cluster or the Sun Grid). In particular, the "Register" operation will be encapsulated into a
package it.tidalwave.image.processor;
import it.tidalwave.image.EditableImage;
abstract public class ImagingTask implements Serializable
{
/***************************************************************************
*
* Concrete implementations should provide the task core in this method.
* This method is performed in a distributed context, if available.
*
**************************************************************************/
abstract protected void run()
throws Exception;
/***************************************************************************
*
* Returns the result of this task. Must be implemented by subclasses.
*
* @return the result
*
**************************************************************************/
abstract public EditableImage getResult();
/***************************************************************************
*
* Executes the task. This method is only called by the framework.
*
**************************************************************************/
public final void execute()
{
try
{
logger.info("Starting " + name);
long time = System.currentTimeMillis();
try
{
run();
}
finally
{
// ... collect statistics...
}
}
catch (Throwable e)
{
throwable = e;
}
}
}
The
private ImagingTaskProcessorListener listener = new ImagingTaskProcessorAdapter()
{
@Override
public void notifyTaskCompleted (ImagingTaskProcessorEvent event)
{
ImagingTask task = event.getTask();
int registerTodoCount = imagingTaskProcessor.getPendingTaskCount(RegisterTask.class) +
imagingTaskProcessor.getRunningTaskCount(RegisterTask.class);
int addTodoCount = imagingTaskProcessor.getPendingTaskCount(AddTask.class) +
imagingTaskProcessor.getRunningTaskCount(AddTask.class);
int registerCompletedCount = imagingTaskProcessor.getCompletedTaskCount(RegisterTask.class);
int addCompletedCount = imagingTaskProcessor.getCompletedTaskCount(AddTask.class);
for (int i = 0; i < registerCompletedCount / 2; i++)
{
scheduleAdd(imagingTaskProcessor.popCompletedTask(RegisterTask.class),
imagingTaskProcessor.popCompletedTask(RegisterTask.class));
}
for (int i = 0; i < addCompletedCount / 2; i++)
{
scheduleAdd(imagingTaskProcessor.popCompletedTask(AddTask.class),
imagingTaskProcessor.popCompletedTask(AddTask.class));
}
//
// Detects last phase
//
if ((registerTodoCount == 0) && (addTodoCount == 0) && (addCompletedCount == 1))
{
//
// Special case: if an odd number of images was in the batch, a single
// RegisterTask stays in the queue as it cannot be added with another.
// Perform a final step adding it with the final AddTask.
//
if (registerCompletedCount == 1)
{
scheduleAdd(imagingTaskProcessor.popCompletedTask(RegisterTask.class),
imagingTaskProcessor.popCompletedTask(AddTask.class));
}
else if (registerCompletedCount == 0)
{
task = imagingTaskProcessor.popCompletedTask(AddTask.class);
synchronized (RegisterCrossCorrelation.this)
{
sum = task.getResult();
RegisterCrossCorrelation.this.notify();
}
imagingTaskProcessor.getStatistics().dump();
}
}
}
private void scheduleAdd (ImagingTask task1, ImagingTask task2)
{
logger.info("Now adding " + task1.getName() + " and " + task2.getName());
AddTask addTask = new AddTask(task1.getResult(), task2.getResult());
imagingTaskProcessor.postWithPriority(addTask);
}
I think that the code is easy to read, in any case:
Default implementation, local processing The default implementation of the
package it.tidalwave.image.processor;
public class LocalImagingTaskProcessor extends ImagingTaskProcessor
{
private static final String CLASS = LocalImagingTaskProcessor.class.getName();
private static final Logger logger = Logger.getLogger(CLASS);
class PoolThread extends Thread
{
public PoolThread (String name)
{
super(name);
setDaemon(true);
}
public void run()
{
for (;;) // TODO: add a smart way to terminate
{
changeFreeWorkerCount(+1);
ImagingTask task = getNextTask();
changeFreeWorkerCount(-1);
logger.fine(Thread.currentThread().getName() + " assigned to " + task.getName());
try
{
task.prepare(LocalImagingTaskProcessor.this);
task.execute();
}
catch (Throwable t) // prevents task errors to disrupt the worker
{
logger.throwing(CLASS, "run()", t);
}
notifyTaskCompleted(task);
}
}
}
public int getWorkerCount()
{
return Math.min(Runtime.getRuntime().availableProcessors(), maxWorkers);
}
public LocalImagingTaskProcessor()
{
for (int i = 0; i < getWorkerCount(); i++)
{
new PoolThread("PoolThread #" + i).start();
}
}
}
The implementation is based on a pool of threads whose size equals the number of processors in the system (via I think it's enough for today. I would like to stress that the abstraction introduced by the See you next time. Bookmark blog post: CommentsComments are listed in date ascending order (oldest first) | Post Comment
| ||
|
|