|
|
||
Fabrizio Giudici's BlogClustering with RioPosted by fabriziogiudici on December 21, 2006 at 06:05 AM | Comments (1)After the BoF at JavaPolis (where the demo failed for some stupid connection problem in spite of the fact that half an hour earlier my setup was perfectly ok) and another code reorganization, we can talk about the Rio plugin for Mistral, which enables users to distribute an imaging task in a local cluster of machines, where nodes can be dinamically added or removed even during the computation. First, let's say that we are referring to Pleiades v0.8.1 and Mistral v0.9.3 – don't refer to the latest committed code since we've already gone a step further with some significative enhancement. Also it's important that you have already read my previous blog entries (or the documents at the Mistral website) since I'm not repeating some basic concepts that I've already talked about. So, you should be comfortable with the Master/Worker pattern and with Mistral basic design:
I think that we should first start with two basic questions. What is Rio? How does it work? I will try to explain it in a very quick fashion, taking a well known model such as J2EE as a term of paragon. Please be aware that for sake of clarity I will sacrifice some of the technical precision – if you want to learn more about Rio please look at its website. Also look at Jérôme Bernard's blog, since he's posting some training resources about Rio. Think of an application server like JBoss and, in particular, of its EJB container. The container controls the life-cycle of EJBs and offers to them some services. You, the programmer, write the code for the EJBs respecting a certain contract (e.g. the
package it.tidalwave.image.processor.rio;
import java.rmi.Remote;
import java.rmi.RemoteException;
import it.tidalwave.image.processor.ImagingTask;
public interface RioImagingWorker extends Remote
{
public ImagingTask process (ImagingTask task)
throws RemoteException;
}
For the implementation class, instead of implementing
package it.tidalwave.image.processor.rio;
import org.jini.rio.core.jsb.ServiceBeanContext;
import org.jini.rio.jsb.ServiceBeanAdapter;
import it.tidalwave.image.processor.ImagingTask;
import it.tidalwave.image.processor.ImagingTaskProcessor;
public class RioImagingWorkerImpl extends ServiceBeanAdapter implements RioImagingWorker
{
public void initialize (ServiceBeanContext context)
throws Exception
{
super.initialize(context);
}
public Object start (ServiceBeanContext context)
throws Exception
{
return super.start(context);
}
public void stop (boolean arg0)
{
}
public Object createProxy()
{
return (RioImagingWorkerProxy.getInstance((RioImagingWorker)getExportedProxy(), getUuid()));
}
public ImagingTask process (ImagingTask task)
{
long time = System.currentTimeMillis();
log("Received task: " + task.getName());
try
{
task.execute();
task.latestExecutionTime = System.currentTimeMillis() - time;
}
catch (Throwable t)
{
task.setThrowable(t);
}
log("Processed task: " + task.getName());
return task;
}
private void log (String message)
{
System.err.println(getUuid() + " " + message);
}
}
This component is used by Mistral to receive an
process()) contains the business logic (which in our case is pretty simple, the task is just executed and the execution time is computed and stored into it).
package it.tidalwave.image.processor.rio;
import java.io.IOException;
import java.io.InvalidObjectException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.rmi.RemoteException;
import net.jini.id.Uuid;
import org.jini.rio.resources.servicecore.AbstractProxy;
import it.tidalwave.image.processor.ImagingTask;
public class RioImagingWorkerProxy extends AbstractProxy implements RioImagingWorker, Serializable
{
static final long serialVersionUID = 1L;
private RioImagingWorkerProxy (RioImagingWorker rioImagingWorker, Uuid id)
{
super(rioImagingWorker, id);
}
static RioImagingWorkerProxy getInstance (RioImagingWorker rioImagingWorker, Uuid id)
{
return new RioImagingWorkerProxy(rioImagingWorker, id);
}
public ImagingTask process (ImagingTask task)
throws RemoteException
{
return ((RioImagingWorker)server).process(task);
}
}
If you look at the real code in the source repository, you'll find it's a bit more complex - but I'm not going to talk about it now; the basic concept is clearly shown in the above listing. The advantage of having the proxy code at hand is that you can customized it (mostly for security reasons). This way of managing proxies comes from Jini which is the core technology of Rio. I bet you've heard of Jini sometimes and now you're thinking “isn't it that technology that was supposed to run in toasters and microwave ovens and that now is dead?”. Well, let me tell you: not only Jini is well alive and kicking, but it doesn't care any more of toasters or ovens, people use it to do pretty cool stuff such as grid computing (mostly in banks and finance) or running Formula One Telemetry (trust me, it really works ;-). There are also companies such as GigaSpaces whose core business is delivering products for Grid which are based on Jini.
What are the specific characteristics of Jini? In a few words, with Jini you can easily build a “federation” of network services, each one advertising itself to others and exporting its own proxy. Jini provides a Look Up Service (LUS) which registers and unregisters new services as they appear and disappear (they are able to discover the LUS automatically, without a priori configuration) and fires remote events that can be listened by clients. In this way (and thanks to a number of other services that I'm not talking about) you can create adaptive network systems that are scalable and flexible. Just to explain a bit of these concepts, let's focus again on EJBs. Some concepts are similar, for instance they also have a Look Up Service (in their jargon it's the Naming Service). EJB clients that live outside of the application server must explicitly configure the connection to the Naming Service (setting the properties For what concerns deployment, for EJBs this is a pretty static concept: you first define how many nodes are in your clusters, specify deployment rules, configure your application server and then deploy the application. If you want to change something, the application must be stopped, reconfigured and re-deployed. With Jini (but now I must talk about the bundle Jini+Rio, since the latter adds a considerable value to the core technology) you just connect a new node and it will be part of the cluster automatically (it would even receive the code to deploy off the wire). Of course things such as load balancing and high availability are parts of the Jini/Rio world as well. A closer look at RioThe basic components of Rio are:
In addition, Rio contains tools to administer and monitor the system remotely.
In one of the simplest configurations (which is the one used by Mistral), you have a “master” node which runs the Provision Monitor, the LUS and Webster, and some “satellite” nodes which just run an empty Cybernode. You can run any number of them, they will join your federation and the Provision Monitor will dispatch to them an instance of the How does the deployment mechanism work? It is based on the “operational string”, which is just the equivalent of the EJB “deployment descriptor” - it's just more dynamic. In facts, since the number of nodes in the cluster is varying over time, and generally speaking it's not known a priori, deployment is based on Service Level Agreement (SLA), that is a set of rules to decide where a certain piece of code should run. Since Rio has a crush for Quality of Service, SLAs can be a function of things such as the CPU load, or the percentage of free disk space, or memory, or everything else you need (SLAs can be extended with custom code). This is really an interesting concept for Mistral – and more in general for clustering - , as for instance it allows you to dynamically schedule workers only on hosts with an idle CPU. Let's have a look to a simple operational string from Mistral:
<?xml version="1.0" encoding="ISO-8859-1" standalone="no"?>
<!DOCTYPE opstring SYSTEM "java://org/jini/rio/dtd/rio_opstring.dtd" [
<!ENTITY Local.IP SYSTEM "java://java.net.InetAddress.getLocalHost().getHostAddress()" >
<!ENTITY Local.Port "9000" >
<!ENTITY CodeServerURL "http://&Local.IP;:&Local.Port;/" >
]>
<opstring>
<OperationalString Name="Name">
<Configuration>
<Component Name="com.sun.rio">
<Parameter Name="defaultExporter" Value="$com_sun_rio_defaultExporter"/>
</Component>
</Configuration>
<ServiceBean Name="RioImagingWorker" MatchOnName="yes" ProvisionType="fixed">
<Codebase>&CodeServerURL;</Codebase>
<Interfaces>
<Interface>it.tidalwave.image.processor.rio.RioImagingWorker</Interface>
<Resources>
<JAR>RioImagingWorker-dl.jar</JAR>
<JAR>rio-dl.jar</JAR>
<JAR>EditableImage.jar</JAR>
</Resources>
</Interfaces>
<ImplementationClass>it.tidalwave.image.processor.rio.RioImagingWorkerImpl
<Resources>
<JAR>RioImagingWorker.jar</JAR>
<JAR>EditableImage.jar</JAR>
<JAR>JAI-Adapter.jar</JAR>
<JAR>jai_codec.jar</JAR>
<JAR>jai_core.jar</JAR>
<JAR>mlibwrapper_jai.jar</JAR>
<JAR>clibwrapper_jiio.jar</JAR>
<JAR>jai_imageio.jar</JAR>
<JAR>rio.jar</JAR>
<JAR>jini-ext.jar</JAR>
<JAR>pleiades.jar</JAR>
<JAR>Jama-1.0.2.jar</JAR>
</Resources>
</ImplementationClass>
<MaxPerMachine>2</MaxPerMachine>
<Maintain>2</Maintain>
<Groups>
<Group>rio</Group>
</Groups>
</ServiceBean>
</OperationalString>
</opstring>
As you can see the opstring is written in XML, but by using entities you can dynamically invoke pieces of Java code (look for instance how the Basically the opstring defines a ServiceBean named "RioImagingWorker", defines its remote interface to At the end, we are saying that we want to deploy 2 instances of the worker for every node (this is pretty hardwired since in my lab I have some dual-core Macs). Run the damn'd thing!To run it, enter the
package com.bloomingstars.pleiades.rioperformancetest;
import com.bloomingstars.imaging.registration.test.PerformanceTest100;
import com.sun.jini.start.ServiceStarter;
import it.tidalwave.image.processor.ImagingTaskProcessor;
import it.tidalwave.image.processor.rio.RioImagingTaskProcessor;
public class Main
{
public Main (final String[] args)
throws Exception
{
System.setProperty("java.util.logging.config.file", "config/rio-log.properties");
System.setProperty("java.util.logging.manager", "com.sun.jini.logging.LogManager");
System.setProperty("java.security.policy", "runtime/rio3.2/policy/policy.all");
System.setProperty("java.protocol.handler.pkgs", "net.jini.url");
System.setProperty("JINI_HOME", "runtime/jini2.1");
System.setProperty("RIO_HOME", "runtime/rio3.2");
System.setProperty("LOG_DIR", "log");
System.setProperty("JAR_DIR", "deploy");
startRio(args);
ImagingTaskProcessor.setDefault(RioImagingTaskProcessor.class);
new PerformanceTest100().run();
}
private void startRio (final String[] args)
{
Thread thread = new Thread("Rio")
{
public void run()
{
try
{
System.err.println("Starting Rio...");
ServiceStarter.main(args);
}
catch (Throwable t)
{
t.printStackTrace();
}
}
};
thread.start();
}
public static void main (String[] args)
throws Exception
{
new Main(args);
}
}
It's a way to start Rio in “embedded mode”, that is from inside another application where you can also run some other tasks. Please notice that we installed a custom processor with This is the basic Jini code that we use to dynamically discover remote workers:
private ServiceDiscoveryListener serviceDiscoveryListener = new ServiceDiscoveryListener()
{
public void serviceAdded (ServiceDiscoveryEvent sdEvent)
{
ServiceItem serviceItem = sdEvent.getPostEventServiceItem();
ServiceID serviceID = serviceItem.serviceID;
discoveredWorkers.add((RioImagingWorker)serviceItem.service);
synchronized (freeWorkers)
{
freeWorkers.add((RioImagingWorker)serviceItem.service);
freeWorkers.notifyAll();
}
}
public void serviceRemoved (ServiceDiscoveryEvent sdEvent)
{
ServiceItem serviceItem = sdEvent.getPreEventServiceItem();
discoveredWorkers.remove((RioImagingWorker)serviceItem.service);
freeWorkers.remove((RioImagingWorker)serviceItem.service);
}
};
and the listener is activated with:
LookupDiscoveryManager lookupDiscoveryManager =
new LookupDiscoveryManager(new String[] { "rio" }, null, null, configuration);
ServiceDiscoveryManager serviceDiscoveryManager =
new ServiceDiscoveryManager(lookupDiscoveryManager,
new LeaseRenewalManager(), configuration);
Class[] classes = new Class[]{ RioImagingWorker.class };
ServiceTemplate template = new ServiceTemplate(null, classes, null);
LookupCache serviceLookupCache =
serviceDiscoveryManager.createLookupCache(template, null, serviceDiscoveryListener);
In this way the list Running the program, you should see something such as:
run:
[java] Starting Rio...
[java] Discovery started
[java] Waiting for available workers
[java] 12:49:21.628 [Rio ] FINEST ServiceProvisioner
- MinThreads=3, MaxThreads=10
[java] 12:49:21.650 [Rio ] FINEST ServiceProvisioner
- ServiceResourceSelector : org.jini.rio.monitor.RoundRobinSelector
[java] 12:49:21.799 [Rio ] FINEST ProvisionMonitorImpl
- initialOpStringLoadDelay=5000
[java] 12:49:21.956 [Thread-13 ] FINE ProvisionMonitorImpl
- ProvisionMonitorPeer: No backup
[java] 12:49:26.868 [Timer-1 ] FINE ProvisionMonitorImpl
- Loading intialOpStrings [opstrings/RioImagingWorkerConfiguration.xml]
[java] 12:49:26.955 [Timer-1 ] FINE ProvisionMonitorImpl
- Deploying Operational String [Name]
[java] 12:49:26.956 [Timer-1 ] FINEST ProvisionMonitorImpl
- Adding OpString [Name] active [true]
[java] 12:49:27.022 [Timer-1 ] FINEST ProvisionMonitorImpl
- OperationalString [Name] Start Date=[Thu Dec 21 12:49:27 CET 2006], Delay [0]
[java] **** Task Register #0 started on worker PoolThread #1
[java] 12:49:28.098 [Timer-1 ] FINEST ServiceElementManager
- Discovered [0] instances of the [Name.RioImagingWorker] service
[java] 12:49:28.111 [Timer-1 ] FINER ServiceElementManager
- Add [RioImagingWorker] to FixedServiceManager
[java] 12:49:28.111 [Timer-1 ] FINER ServiceProvisioner
- Deploy [RioImagingWorker]
[java] **** Task Register #1 started on worker PoolThread #0
[java] **** Task Register #2 started on worker PoolThread #0
[java] **** Task Add #0 started on worker PoolThread #1
[java] **** Task Register #3 started on worker PoolThread #1
[java] **** Task Register #4 started on worker PoolThread #0
[java] **** Task Add #1 started on worker PoolThread #1
[java] **** Task Add #2 started on worker PoolThread #1
[java] **** Task Register #5 started on worker PoolThread #1
...
Basically you've started a LUS, a Provision Monitor and Webster. As there's no Cybernode, actually just a few things happened on the network side. The ending lines in the log above are just notifying that the tasks are being assigned to the local pool of threads. Now, let's go with the interesting stuff. Go to the RioImagingWorker subproject of Mistral-Rio and launch This will run an empty cybernode (please note that this component does not contain any Pleiades-specific code). Please disable any firewall between your two computers. Also be aware that some routers, especially WiFi ones such as my Apple Airport Express, aren't friendly to the multicast protocols used by Jini. Sure, it's just a matter of configuring them, but I don't know how :-) so it's better to use a plain, old switch with network cables. ;-)Now you should see something more interesting such as: [java] Dec 21, 2006 12:57:51 PM org.jini.rio.cybernode.JSBDelegate advertise [java] FINE: RioImagingWorker: advertised [java] Dec 21, 2006 12:57:51 PM org.jini.rio.cybernode.JSBDelegate advertise [java] FINE: RioImagingWorker: advertised [java] Dec 21, 2006 12:57:52 PM org.jini.rio.cybernode.ServiceConsumer connect [java] FINE: Established ProvisionManager registration [java] Dec 21, 2006 12:57:52 PM org.jini.rio.cybernode.ServiceConsumer register [java] INFO: Registered to a ProvisionManager This means that the Cybernode has discovered the Provision Monitor. Look back at the first console:
[java] 12:57:42.349 [x request dispatch] FINE ServiceProvisioner
- Registered new ServiceBeanInstantiator @ 10.0.1.2
[java] ServiceBeanInstantiator count [1]
[java] 12:57:42.351 [x request dispatch] FINER PendingServiceElementManag -
[java] Fixed-Service Manager collection size : 1
[java] --
[java] 1 it.tidalwave.image.processor.rio.RioImagingWorker: RioImagingWorker <0>
[java] --
[java] 12:57:42.351 [x request dispatch] FINER InstantiatorResource
- Cybernode at [10.0.1.2] has [0] instance(s), planned [2] of Fixed service [RioImagingWorker]
[java] 12:57:42.352 [x request dispatch] FINER InstantiatorResource
- Cybernode at [10.0.1.2] meets qualitative requirements for [RioImagingWorker]
[java] 12:57:42.352 [x request dispatch] FINEST ServiceProvisioner
- [RioImagingWorker] instanceID : 1
[java] 12:57:43.802 [x request dispatch] FINEST ServiceProvisioner
- [RioImagingWorker] instanceID : 2
[java] 12:57:43.802 [x request dispatch] FINEST ServiceProvisioner
- Wait until all ProvisionTask threads are complete ...
[java] 12:57:44.039 [ol:1166702169538-9] FINER ServiceProvisioner
- Allocating [RioImagingWorker] ...
[java] 12:57:44.111 [ol:1166702169538-8] FINER ServiceProvisioner
- Allocating [RioImagingWorker] ...
[java] 12:57:51.849 [ol:1166702169538-8] FINER ServiceProvisioner
- Allocated [RioImagingWorker]
[java] 12:57:51.874 [ol:1166702169538-8] FINEST ServiceProvisioner
- RioImagingWorker ServiceBeanInstance Instance=[2]
Proxy=[it.tidalwave.image.processor.rio.RioImagingWorkerProxy$ConstrainableRioImagingWorker
Proxy] ID=[7459c827-33a3-4ca3-a6dc-30b6291c449b] HostAddress=[10.0.1.2], Annotation null
[java] 12:57:51.926 [ol:1166702169538-8] FINE ServiceElementManager
- [RioImagingWorker] service provisioned, instance=Instance=[2]
Proxy=[it.tidalwave.image.processor.rio.RioImagingWorkerProxy$ConstrainableRioImagingWorker
Proxy] ID=[7459c827-33a3-4ca3-a6dc-30b6291c449b] HostAddress=[10.0.1.2]
[java] 12:57:51.926 [ol:1166702169538-8] FINE ServiceElementManager
- Added SBI = 2, 7459c827-33a3-4ca3-a6dc-30b6291c449b
[java] Instance ID list [1, 2]
[java] ServiceBeanInstance List:
[java] Instance=[2] Proxy=[it.tidalwave.image.processor.rio.RioImagingWorkerProxy$
ConstrainableRioImagingWorkerProxy] ID=[7459c827-33a3-4ca3-a6dc-30b6291c449b]
HostAddress=[10.0.1.2]
[java] 12:57:51.927 [ol:1166702169538-8] FINEST ServiceElementManager
- Get FaultDetectionHandler for [RioImagingWorker]
[java] 12:57:51.968 [ol:1166702169538-9] FINER ServiceProvisioner
- Allocated [RioImagingWorker]
[java] 12:57:51.970 [ol:1166702169538-9] FINEST ServiceProvisioner
- RioImagingWorker ServiceBeanInstance Instance=[2]
Proxy=[it.tidalwave.image.processor.rio.RioImagingWorkerProxy$ConstrainableRioImagingWorker
Proxy] ID=[943170f2-76b4-4c2c-9818-25fb9723ac93] HostAddress=[10.0.1.2], Annotation null
[java] 12:57:51.972 [ol:1166702169538-9] FINE ServiceElementManager
- [RioImagingWorker] service provisioned, instance=Instance=[2] Proxy=[it.tidalwave.image.processor.rio.RioImagingWorkerProxy
$ConstrainableRioImagingWorkerProxy] ID=[943170f2-76b4-4c2c-9818-25fb9723ac93] HostAddress=[10.0.1.2]
[java] 12:57:51.972 [ol:1166702169538-9] FINE ServiceElementManager
- Added SBI = 2, 943170f2-76b4-4c2c-9818-25fb9723ac93
[java] Instance ID list [1, 2]
[java] ServiceBeanInstance List:
[java] Instance=[2] Proxy=[it.tidalwave.image.processor.rio.RioImagingWorkerProxy$
ConstrainableRioImagingWorkerProxy] ID=[7459c827-33a3-4ca3-a6dc-30b6291c449b] HostAddress=[10.0.1.2]
[java] Instance=[2] Proxy=[it.tidalwave.image.processor.rio.RioImagingWorkerProxy$
ConstrainableRioImagingWorkerProxy] ID=[943170f2-76b4-4c2c-9818-25fb9723ac93] HostAddress=[10.0.1.2]
[java] 12:57:51.973 [ol:1166702169538-9] FINEST ServiceElementManager
- Get FaultDetectionHandler for [RioImagingWorker]
[java] 12:57:51.984 [x request dispatch] FINEST ServiceProvisioner
- ProvisionTask threads join complete
This means that the Provision Monitor actually deployed two instances of the service, as expected! Wow, you're running your Rio cluster! Those strange strings such as 943170f2-76b4-4c2c-9818-25fb9723ac93 are called UUIDs and are unique identifiers used by Jini for designating services around the network.
A few lines below you should see:
[java] serviceAdded(7459c827-33a3-4ca3-a6dc-30b6291c449b) [java] serviceAdded(943170f2-76b4-4c2c-9818-25fb9723ac93) and this is just the [java] **** Task Add #40 started on worker PoolThread #0 [java] **** Task Register #46 started on worker PoolThread #0 [java] **** Task Register #47 started on worker 7459c827-33a3-4ca3-a6dc-30b6291c449b [java] **** Task Register #48 started on worker 943170f2-76b4-4c2c-9818-25fb9723ac93 [java] **** Task Add #42 started on worker PoolThread #1 [java] **** Task Add #41 started on worker PoolThread #0 which means that tasks are now being scheduled also to remote workers. Everything is working! (well, if you run the cybernode too late after the start of the main program, probably you won't see any remote scheduled task since the computation is already over – just kill the processes and restart them.
If you have more computers in your network, you can just run more We've seen a lot of things for today, and I'm stopping here for today. There are some open issues:
Bookmark blog post: CommentsComments are listed in date ascending order (oldest first) | Post Comment
| ||
|
|