Skip to main content

Clustering with Rio

Posted by fabriziogiudici on December 21, 2006 at 6:05 AM PST

After the BoF at JavaPolis (where the demo failed for some stupid connection problem in spite of the fact that half an hour earlier my setup was perfectly ok) and another code reorganization, we can talk about the Rio plugin for Mistral, which enables users to distribute an imaging task in a local cluster of machines, where nodes can be dinamically added or removed even during the computation.

First, let's say that we are referring to Pleiades v0.8.1 and Mistral v0.9.3 – don't refer to the latest committed code since we've already gone a step further with some significative enhancement. Also it's important that you have already read my previous blog entries (or the documents at the Mistral website) since I'm not repeating some basic concepts that I've already talked about. So, you should be comfortable with the Master/Worker pattern and with Mistral basic design:

Mistral Simplified Design.png

I think that we should first start with two basic questions. What is Rio? How does it work? I will try to explain it in a very quick fashion, taking a well known model such as J2EE as a term of paragon. Please be aware that for sake of clarity I will sacrifice some of the technical precision – if you want to learn more about Rio please look at its website. Also look at Jérôme Bernard's blog, since he's posting some training resources about Rio.

Think of an application server like JBoss and, in particular, of its EJB container. The container controls the life-cycle of EJBs and offers to them some services. You, the programmer, write the code for the EJBs respecting a certain contract (e.g. the SessionBean interface) and some coding rules. EJBs have a Remote Interface which exposes some methods to the world and clients can remotely connect to them and invoke those methods by means of a lookup. These very same concepts apply just the same to Rio. For instance, this is the definition of a Remote Interface:

package it.tidalwave.image.processor.rio;

import java.rmi.Remote;
import java.rmi.RemoteException;
import it.tidalwave.image.processor.ImagingTask;

public interface RioImagingWorker extends Remote
  {
    public ImagingTask process (ImagingTask task)
      throws RemoteException;
  }

For the implementation class, instead of implementing SessionBean, you need to extend ServiceBean as in the following example:

package it.tidalwave.image.processor.rio; 

import org.jini.rio.core.jsb.ServiceBeanContext;
import org.jini.rio.jsb.ServiceBeanAdapter;
import it.tidalwave.image.processor.ImagingTask;
import it.tidalwave.image.processor.ImagingTaskProcessor;

public class RioImagingWorkerImpl extends ServiceBeanAdapter implements RioImagingWorker
  {
    public void initialize (ServiceBeanContext context)
      throws Exception
      {
        super.initialize(context);
      }

    public Object start (ServiceBeanContext context)
      throws Exception
      {
        return super.start(context);
      }

    public void stop (boolean arg0)
      {
      }
   
    public Object createProxy()
      {
        return (RioImagingWorkerProxy.getInstance((RioImagingWorker)getExportedProxy(), getUuid()));
      }
   
    public ImagingTask process (ImagingTask task)
      {
        long time = System.currentTimeMillis();
        log("Received task: " + task.getName());
       
        try
          {
            task.execute();
            task.latestExecutionTime = System.currentTimeMillis() - time;
          }
        catch (Throwable t)
          {
            task.setThrowable(t);
          } 

        log("Processed task: " + task.getName());
        return task; 
      }
   
    private void log (String message)
      {
        System.err.println(getUuid() + " " + message); 
      }
  }

This component is used by Mistral to receive an ImagingTask dispatched from somewhere, execute it locally and then return it back with the result. As for EJBs, some methods are devoted to controlling the life-cycle:

  • initialize()
  • start()
  • stop()
  • createProxy()

while others (in the example process()) contains the business logic (which in our case is pretty simple, the task is just executed and the execution time is computed and stored into it).

createProxy is the first thing that we note and doesn't have an equivalent for EJBs. In order to allow remote invocation of methods, EJBs use the Proxy (or Stub / Skeleton) pattern: a special class (Proxy) is generated which implements the Remote Interface and contains all the stuff for achieving a remote connection and serialize the parameters and the result. With EJBs the Proxy is always automatically generated and you don't have control on it; with Rio, instead, you explicitly control its behaviour (but don't panic, if you don't need to, you will not write the code for serialization). This is an example of Rio proxy:

package it.tidalwave.image.processor.rio;

import java.io.IOException;
import java.io.InvalidObjectException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.rmi.RemoteException;
import net.jini.id.Uuid;
import org.jini.rio.resources.servicecore.AbstractProxy;
import it.tidalwave.image.processor.ImagingTask;

public class RioImagingWorkerProxy extends AbstractProxy implements RioImagingWorker, Serializable
  {
    static final long serialVersionUID = 1L;
   
    private RioImagingWorkerProxy (RioImagingWorker rioImagingWorker, Uuid id)
      {
        super(rioImagingWorker, id);
      }

    static RioImagingWorkerProxy getInstance (RioImagingWorker rioImagingWorker, Uuid id)
      {
        return new RioImagingWorkerProxy(rioImagingWorker, id);
      }
   
    public ImagingTask process (ImagingTask task)
      throws RemoteException
      {
        return ((RioImagingWorker)server).process(task);
      }
  }

If you look at the real code in the source repository, you'll find it's a bit more complex - but I'm not going to talk about it now; the basic concept is clearly shown in the above listing. The advantage of having the proxy code at hand is that you can customized it (mostly for security reasons). This way of managing proxies comes from Jini which is the core technology of Rio.


I bet you've heard of Jini sometimes and now you're thinking “isn't it that technology that was supposed to run in toasters and microwave ovens and that now is dead?”. Well, let me tell you: not only Jini is well alive and kicking, but it doesn't care any more of toasters or ovens, people use it to do pretty cool stuff such as grid computing (mostly in banks and finance) or running Formula One Telemetry (trust me, it really works ;-). There are also companies such as GigaSpaces whose core business is delivering products for Grid which are based on Jini.

What are the specific characteristics of Jini? In a few words, with Jini you can easily build a “federation” of network services, each one advertising itself to others and exporting its own proxy. Jini provides a Look Up Service (LUS) which registers and unregisters new services as they appear and disappear (they are able to discover the LUS automatically, without a priori configuration) and fires remote events that can be listened by clients. In this way (and thanks to a number of other services that I'm not talking about) you can create adaptive network systems that are scalable and flexible.

Just to explain a bit of these concepts, let's focus again on EJBs. Some concepts are similar, for instance they also have a Look Up Service (in their jargon it's the Naming Service). EJB clients that live outside of the application server must explicitly configure the connection to the Naming Service (setting the properties INITIAL_CONTEXT_FACTORY and PROVIDER_URL). With Jini, you don't have to: thanks to a “multicast discovery” protocol every component finds the others automatically.

For what concerns deployment, for EJBs this is a pretty static concept: you first define how many nodes are in your clusters, specify deployment rules, configure your application server and then deploy the application. If you want to change something, the application must be stopped, reconfigured and re-deployed. With Jini (but now I must talk about the bundle Jini+Rio, since the latter adds a considerable value to the core technology) you just connect a new node and it will be part of the cluster automatically (it would even receive the code to deploy off the wire). Of course things such as load balancing and high availability are parts of the Jini/Rio world as well.

A closer look at Rio

The basic components of Rio are:

  • the Cybernode, which is the container of ServiceBeans;
  • the Provision Monitor, which is the deployment controller, actually the component that decides which ServiceBeans should be instantiated and where;
  • the Look Up Service (LUS), which we have already introduced;
  • a small web server named Webster which serves the code in form of JAR files to every Cybernode which needs it.

In addition, Rio contains tools to administer and monitor the system remotely.

rio-arch-block.png

In one of the simplest configurations (which is the one used by Mistral), you have a “master” node which runs the Provision Monitor, the LUS and Webster, and some “satellite” nodes which just run an empty Cybernode. You can run any number of them, they will join your federation and the Provision Monitor will dispatch to them an instance of the RioServiceWorker bean, ready to accept and process ImagingTasks.

How does the deployment mechanism work? It is based on the “operational string”, which is just the equivalent of the EJB “deployment descriptor” - it's just more dynamic. In facts, since the number of nodes in the cluster is varying over time, and generally speaking it's not known a priori, deployment is based on Service Level Agreement (SLA), that is a set of rules to decide where a certain piece of code should run. Since Rio has a crush for Quality of Service, SLAs can be a function of things such as the CPU load, or the percentage of free disk space, or memory, or everything else you need (SLAs can be extended with custom code). This is really an interesting concept for Mistral – and more in general for clustering - , as for instance it allows you to dynamically schedule workers only on hosts with an idle CPU.

Let's have a look to a simple operational string from Mistral:

<?xml version="1.0" encoding="ISO-8859-1" standalone="no"?>
<!DOCTYPE opstring SYSTEM "java://org/jini/rio/dtd/rio_opstring.dtd" [
<!ENTITY Local.IP SYSTEM "java://java.net.InetAddress.getLocalHost().getHostAddress()" >    
<!ENTITY Local.Port "9000" >
<!ENTITY CodeServerURL "http://&Local.IP;:&Local.Port;/" >   
]>

<opstring>
    <OperationalString Name="Name">
        <Configuration>
            <Component Name="com.sun.rio">
                <Parameter Name="defaultExporter" Value="$com_sun_rio_defaultExporter"/>
            </Component>
        </Configuration>
        <ServiceBean Name="RioImagingWorker" MatchOnName="yes" ProvisionType="fixed">
            <Codebase>&CodeServerURL;</Codebase>
            <Interfaces>
                <Interface>it.tidalwave.image.processor.rio.RioImagingWorker</Interface>
                <Resources>
                    <JAR>RioImagingWorker-dl.jar</JAR>
                    <JAR>rio-dl.jar</JAR>                   
                    <JAR>EditableImage.jar</JAR>
                </Resources>
            </Interfaces>
           
            <ImplementationClass>it.tidalwave.image.processor.rio.RioImagingWorkerImpl
                <Resources>
                    <JAR>RioImagingWorker.jar</JAR>
                    <JAR>EditableImage.jar</JAR>
                    <JAR>JAI-Adapter.jar</JAR>
                    <JAR>jai_codec.jar</JAR>
                    <JAR>jai_core.jar</JAR>
                    <JAR>mlibwrapper_jai.jar</JAR>
                    <JAR>clibwrapper_jiio.jar</JAR>
                    <JAR>jai_imageio.jar</JAR>
                    <JAR>rio.jar</JAR>                   
                    <JAR>jini-ext.jar</JAR>                   
                    <JAR>pleiades.jar</JAR>
                    <JAR>Jama-1.0.2.jar</JAR>
                </Resources>
            </ImplementationClass>
           
            <MaxPerMachine>2</MaxPerMachine>
            <Maintain>2</Maintain>
            <Groups>
                <Group>rio</Group>
            </Groups>
        </ServiceBean>       
    </OperationalString>
</opstring>

As you can see the opstring is written in XML, but by using entities you can dynamically invoke pieces of Java code (look for instance how the CodeServerURL entity can be set to the local IP address URL). Also, the dollar character can be used to access a set of preset properties, such as $com_sun_rio_defaultExporter.

Basically the opstring defines a ServiceBean named "RioImagingWorker", defines its remote interface to it.tidalwave.image.processor.rio.RioImagingWorker and declares a set of JARs which will be distributed together with the proxy (

RioImagingWorker-dl.jar
is where I packed the proxy itself, EditableImage.jar is Mistral itself, rio-dl.jar is always required). In a similar fashion, it also defines the concrete implementation to it.tidalwave.image.processor.rio.RioImagingWorkerImpl and declares all the JARs that are required to it (you see again Mistral stuff, but this time also the JAI classes, the Mistral JAI adapter and obviously pleiades.jar which contains the actual application code).

At the end, we are saying that we want to deploy 2 instances of the worker for every node (this is pretty hardwired since in my lab I have some dual-core Macs).

Run the damn'd thing!

To run it, enter the RioPerformanceTest subproject of Pleiades and launch ant run. The main class that will be run is the following:

package com.bloomingstars.pleiades.rioperformancetest;

import com.bloomingstars.imaging.registration.test.PerformanceTest100;
import com.sun.jini.start.ServiceStarter;
import it.tidalwave.image.processor.ImagingTaskProcessor;
import it.tidalwave.image.processor.rio.RioImagingTaskProcessor;

public class Main  
  {
    public Main (final String[] args)
      throws Exception
      {
        System.setProperty("java.util.logging.config.file", "config/rio-log.properties");
        System.setProperty("java.util.logging.manager", "com.sun.jini.logging.LogManager");
        System.setProperty("java.security.policy", "runtime/rio3.2/policy/policy.all");
        System.setProperty("java.protocol.handler.pkgs", "net.jini.url");
        System.setProperty("JINI_HOME", "runtime/jini2.1");
        System.setProperty("RIO_HOME", "runtime/rio3.2");
        System.setProperty("LOG_DIR", "log");
        System.setProperty("JAR_DIR", "deploy");
       
        startRio(args);
        ImagingTaskProcessor.setDefault(RioImagingTaskProcessor.class);
        new PerformanceTest100().run();
      }
   
    private void startRio (final String[] args)
      {
        Thread thread = new Thread("Rio")
          {
            public void run()
              {  
                try
                  {
                    System.err.println("Starting Rio...");
                    ServiceStarter.main(args);
                  }
                catch (Throwable t)
                  {
                    t.printStackTrace(); 
                  }
              }
          };
         
        thread.start();
      }
   
    public static void main (String[] args)
      throws Exception
      {
        new Main(args);
      }
  }

It's a way to start Rio in “embedded mode”, that is from inside another application where you can also run some other tasks. Please notice that we installed a custom processor with ImagingTaskProcessor.setDefault(RioImagingTaskProcessor.class): this is an extension of LocalImagingTaskProcessor which, besides the inner pooled threads, keeps a list of existing remote workers and schedule tasks to them too.

This is the basic Jini code that we use to dynamically discover remote workers:

    private ServiceDiscoveryListener serviceDiscoveryListener = new ServiceDiscoveryListener()
      {
        public void serviceAdded (ServiceDiscoveryEvent sdEvent)
          {
            ServiceItem serviceItem = sdEvent.getPostEventServiceItem();
            ServiceID serviceID = serviceItem.serviceID;
            discoveredWorkers.add((RioImagingWorker)serviceItem.service);
           
            synchronized (freeWorkers)
              {
                freeWorkers.add((RioImagingWorker)serviceItem.service);
                freeWorkers.notifyAll();
              }
          }

        public void serviceRemoved (ServiceDiscoveryEvent sdEvent)
          {
            ServiceItem serviceItem = sdEvent.getPreEventServiceItem();
            discoveredWorkers.remove((RioImagingWorker)serviceItem.service);
            freeWorkers.remove((RioImagingWorker)serviceItem.service);
          }
      };

and the listener is activated with:

LookupDiscoveryManager lookupDiscoveryManager =  
    new LookupDiscoveryManager(new String[] { "rio" }, null, null, configuration);

ServiceDiscoveryManager serviceDiscoveryManager =
    new ServiceDiscoveryManager(lookupDiscoveryManager,
                                new LeaseRenewalManager(), configuration);

Class[] classes = new Class[]{ RioImagingWorker.class };
ServiceTemplate template = new ServiceTemplate(null, classes, null);
LookupCache serviceLookupCache =
    serviceDiscoveryManager.createLookupCache(template, null, serviceDiscoveryListener);

In this way the list discoveredWorkers is always kept up-to-date with the current situation, even when new nodes are added or removed from the cluster; and RioImagingTaskProcessor always know how many remote workers are available.

Running the program, you should see something such as:

run:
[java] Starting Rio...
[java] Discovery started
[java] Waiting for available workers
[java] 12:49:21.628 [Rio               ] FINEST  ServiceProvisioner        
       - MinThreads=3, MaxThreads=10
[java] 12:49:21.650 [Rio               ] FINEST  ServiceProvisioner        
       - ServiceResourceSelector : org.jini.rio.monitor.RoundRobinSelector
[java] 12:49:21.799 [Rio               ] FINEST  ProvisionMonitorImpl      
      - initialOpStringLoadDelay=5000
[java] 12:49:21.956 [Thread-13         ] FINE    ProvisionMonitorImpl      
       - ProvisionMonitorPeer: No backup
[java] 12:49:26.868 [Timer-1           ] FINE    ProvisionMonitorImpl      
       - Loading intialOpStrings [opstrings/RioImagingWorkerConfiguration.xml]
[java] 12:49:26.955 [Timer-1           ] FINE    ProvisionMonitorImpl      
       - Deploying Operational String [Name]
[java] 12:49:26.956 [Timer-1           ] FINEST  ProvisionMonitorImpl      
       - Adding OpString [Name] active [true]
[java] 12:49:27.022 [Timer-1           ] FINEST  ProvisionMonitorImpl      
       - OperationalString [Name] Start Date=[Thu Dec 21 12:49:27 CET 2006], Delay [0]
[java] **** Task Register #0 started on worker PoolThread #1
[java] 12:49:28.098 [Timer-1           ] FINEST  ServiceElementManager     
       - Discovered [0] instances of the [Name.RioImagingWorker] service
[java] 12:49:28.111 [Timer-1           ] FINER   ServiceElementManager     
       - Add [RioImagingWorker] to FixedServiceManager
[java] 12:49:28.111 [Timer-1           ] FINER   ServiceProvisioner        
       - Deploy [RioImagingWorker]
[java] **** Task Register #1 started on worker PoolThread #0
[java] **** Task Register #2 started on worker PoolThread #0
[java] **** Task Add #0 started on worker PoolThread #1
[java] **** Task Register #3 started on worker PoolThread #1
[java] **** Task Register #4 started on worker PoolThread #0
[java] **** Task Add #1 started on worker PoolThread #1
[java] **** Task Add #2 started on worker PoolThread #1
[java] **** Task Register #5 started on worker PoolThread #1
...

Basically you've started a LUS, a Provision Monitor and Webster. As there's no Cybernode, actually just a few things happened on the network side. The ending lines in the log above are just notifying that the tasks are being assigned to the local pool of threads.

Now, let's go with the interesting stuff. Go to the RioImagingWorker subproject of Mistral-Rio and launch ant run-cybernode - it's best if you do it from another computer in your network, but you could also just open another console on the same computer (in this case you'll get a java.net.BindException: Could not start listener. Port [9000] already taken that you can safely ignore).

This will run an empty cybernode (please note that this component does not contain any Pleiades-specific code).


Please disable any firewall between your two computers. Also be aware that some routers, especially WiFi ones such as my Apple Airport Express, aren't friendly to the multicast protocols used by Jini. Sure, it's just a matter of configuring them, but I don't know how :-) so it's better to use a plain, old switch with network cables. ;-)

Now you should see something more interesting such as:

[java] Dec 21, 2006 12:57:51 PM org.jini.rio.cybernode.JSBDelegate advertise
[java] FINE: RioImagingWorker: advertised
[java] Dec 21, 2006 12:57:51 PM org.jini.rio.cybernode.JSBDelegate advertise
[java] FINE: RioImagingWorker: advertised
[java] Dec 21, 2006 12:57:52 PM org.jini.rio.cybernode.ServiceConsumer connect
[java] FINE: Established ProvisionManager registration
[java] Dec 21, 2006 12:57:52 PM org.jini.rio.cybernode.ServiceConsumer register
[java] INFO: Registered to a ProvisionManager

This means that the Cybernode has discovered the Provision Monitor. Look back at the first console:

[java] 12:57:42.349 [x request dispatch] FINE    ServiceProvisioner         
                - Registered new ServiceBeanInstantiator @ 10.0.1.2
[java] ServiceBeanInstantiator count [1]
[java] 12:57:42.351 [x request dispatch] FINER   PendingServiceElementManag -
[java] Fixed-Service Manager collection size : 1
[java] --
[java] 1 it.tidalwave.image.processor.rio.RioImagingWorker: RioImagingWorker <0>
[java] --
[java] 12:57:42.351 [x request dispatch] FINER   InstantiatorResource      
       - Cybernode at [10.0.1.2] has [0] instance(s), planned [2] of Fixed service [RioImagingWorker]
[java] 12:57:42.352 [x request dispatch] FINER   InstantiatorResource      
       - Cybernode at [10.0.1.2] meets qualitative requirements for [RioImagingWorker]
[java] 12:57:42.352 [x request dispatch] FINEST  ServiceProvisioner        
       - [RioImagingWorker] instanceID : 1
[java] 12:57:43.802 [x request dispatch] FINEST  ServiceProvisioner        
       - [RioImagingWorker] instanceID : 2
[java] 12:57:43.802 [x request dispatch] FINEST  ServiceProvisioner        
       - Wait until all ProvisionTask threads are complete ...
[java] 12:57:44.039 [ol:1166702169538-9] FINER   ServiceProvisioner        
       - Allocating [RioImagingWorker] ...
[java] 12:57:44.111 [ol:1166702169538-8] FINER   ServiceProvisioner        
       - Allocating [RioImagingWorker] ...
[java] 12:57:51.849 [ol:1166702169538-8] FINER   ServiceProvisioner        
       - Allocated [RioImagingWorker]
[java] 12:57:51.874 [ol:1166702169538-8] FINEST  ServiceProvisioner        
       - RioImagingWorker ServiceBeanInstance Instance=[2]
       Proxy=[it.tidalwave.image.processor.rio.RioImagingWorkerProxy$ConstrainableRioImagingWorker
       Proxy] ID=[7459c827-33a3-4ca3-a6dc-30b6291c449b] HostAddress=[10.0.1.2], Annotation null
[java] 12:57:51.926 [ol:1166702169538-8] FINE    ServiceElementManager     
       - [RioImagingWorker] service provisioned, instance=Instance=[2]
       Proxy=[it.tidalwave.image.processor.rio.RioImagingWorkerProxy$ConstrainableRioImagingWorker
       Proxy] ID=[7459c827-33a3-4ca3-a6dc-30b6291c449b] HostAddress=[10.0.1.2]
[java] 12:57:51.926 [ol:1166702169538-8] FINE    ServiceElementManager     
       - Added SBI = 2, 7459c827-33a3-4ca3-a6dc-30b6291c449b
[java] Instance ID list [1, 2]
[java] ServiceBeanInstance List:
[java] Instance=[2] Proxy=[it.tidalwave.image.processor.rio.RioImagingWorkerProxy$
       ConstrainableRioImagingWorkerProxy] ID=[7459c827-33a3-4ca3-a6dc-30b6291c449b] 
       HostAddress=[10.0.1.2]
[java] 12:57:51.927 [ol:1166702169538-8] FINEST  ServiceElementManager     
       - Get FaultDetectionHandler for [RioImagingWorker]
[java] 12:57:51.968 [ol:1166702169538-9] FINER   ServiceProvisioner        
       - Allocated [RioImagingWorker]
[java] 12:57:51.970 [ol:1166702169538-9] FINEST  ServiceProvisioner        
       - RioImagingWorker ServiceBeanInstance Instance=[2]
       Proxy=[it.tidalwave.image.processor.rio.RioImagingWorkerProxy$ConstrainableRioImagingWorker
       Proxy] ID=[943170f2-76b4-4c2c-9818-25fb9723ac93] HostAddress=[10.0.1.2], Annotation null
[java] 12:57:51.972 [ol:1166702169538-9] FINE    ServiceElementManager     
       - [RioImagingWorker] service provisioned, instance=Instance=[2] Proxy=[it.tidalwave.image.processor.rio.RioImagingWorkerProxy
       $ConstrainableRioImagingWorkerProxy] ID=[943170f2-76b4-4c2c-9818-25fb9723ac93] HostAddress=[10.0.1.2]
[java] 12:57:51.972 [ol:1166702169538-9] FINE    ServiceElementManager     
       - Added SBI = 2, 943170f2-76b4-4c2c-9818-25fb9723ac93
[java] Instance ID list [1, 2]
[java] ServiceBeanInstance List:
[java] Instance=[2] Proxy=[it.tidalwave.image.processor.rio.RioImagingWorkerProxy$
       ConstrainableRioImagingWorkerProxy] ID=[7459c827-33a3-4ca3-a6dc-30b6291c449b] HostAddress=[10.0.1.2]
[java] Instance=[2] Proxy=[it.tidalwave.image.processor.rio.RioImagingWorkerProxy$
       ConstrainableRioImagingWorkerProxy] ID=[943170f2-76b4-4c2c-9818-25fb9723ac93] HostAddress=[10.0.1.2]
[java] 12:57:51.973 [ol:1166702169538-9] FINEST  ServiceElementManager     
       - Get FaultDetectionHandler for [RioImagingWorker]
[java] 12:57:51.984 [x request dispatch] FINEST  ServiceProvisioner        
       - ProvisionTask threads join complete

This means that the Provision Monitor actually deployed two instances of the service, as expected! Wow, you're running your Rio cluster! Those strange strings such as 943170f2-76b4-4c2c-9818-25fb9723ac93 are called UUIDs and are unique identifiers used by Jini for designating services around the network.

A few lines below you should see:

[java] serviceAdded(7459c827-33a3-4ca3-a6dc-30b6291c449b)
[java] serviceAdded(943170f2-76b4-4c2c-9818-25fb9723ac93)

and this is just the RioImagingTaskProcessor that in turn discovered the existence of the two new workers and registered internally in a list. A few lines below you should see something such as:

[java] **** Task Add #40 started on worker PoolThread #0
[java] **** Task Register #46 started on worker PoolThread #0
[java] **** Task Register #47 started on worker 7459c827-33a3-4ca3-a6dc-30b6291c449b
[java] **** Task Register #48 started on worker 943170f2-76b4-4c2c-9818-25fb9723ac93
[java] **** Task Add #42 started on worker PoolThread #1
[java] **** Task Add #41 started on worker PoolThread #0

which means that tasks are now being scheduled also to remote workers. Everything is working! (well, if you run the cybernode too late after the start of the main program, probably you won't see any remote scheduled task since the computation is already over – just kill the processes and restart them.

If you have more computers in your network, you can just run more ant run-cybernode on them and enjoy a larger cluster.

We've seen a lot of things for today, and I'm stopping here for today. There are some open issues:

  • The current implementation of RioImagingTaskProcessor does not detect failures. In other words, if you kill a cybernode while it's working, the tasks that were running on it are lost forever.
  • If you use two computers and measure the elapsed time of the computation... you'll find that it is faster when the second computer is turned off! This is because Pleiades parallel decomposition is made of tasks which carry a lot of data back and forth (images) and whose computation time is relatively short. For instance, the RegisterTask carries along around 12 MB or data and performs in less than 2 seconds. Those 12MB take much more than 2 seconds to be serialized, so the parallel advantage is completely jeopardized. While the amount of data can be reduced with some trivial optimization, it will be necessary to introduce a new concept, the ImagingTaskGroup, to have a more coarse-grain parallelizing approach and reduce the network chattiness. In the meantime, please be aware that if you have a different computing scenario (less data or longer tasks), you could already take an advantage from the current architecture.

Things to do in future blogs (and, of course, we have also to talk about the Sun Grid implementation of Mistral, which is pretty different). Since it's already December 21 (tempus fugit!), let me say Merry Christmas and Happy New Year to everybody at java.net! :-)

PS Let me remind you that Mistral and Pleiades are a joint work with Emmanuele Sordini. BTW, he has just opened his own blog.

Related Topics >>