Skip to main content

Actors in Java

Posted by fabriziogiudici on January 4, 2012 at 11:53 AM PST

Recently there has been a renewed interest into the Actor programming model. The Actor Model actually comes from the '70s, but as far as I'm aware it has been used only in a very limited subset of industrial projects outside the area of telecoms. Erlang (not by chance developed at Ericsson, a telecom industry), which is a language whose concurrency model is actor oriented, is getting some attention but it's definitely a niche kind of a language. More interesting is the fact that the Scala language developed its own actor-based platform, Akka, which is also available to Java (and, generally speaking, to JVM-based languages). While there might (a-hum) be a lot of hype about Scala and I'm pretty oriented to apply a big low-pass filter on trendy and cool stuff, actors have definitely their place in the world when you're involved with complex concurrency. Once upon a time complex concurrency was a matter only for specific industrial segments (e.g. the previously cited telecom industry), but since things such as multi-cores and "the cloud" are invading our battlefields it's better to get prepared.
 

BTW, cloud and multi-core apart, I've already seen a lot of production code with scattered synchronized sections that "perhaps shouldn't be there, but the thing works and it's better to let them stay". Note that this argument is the typical smell of lack of testing, but concurrency is such a complex thing to test that even when the customer is a good guy and achieves a very high coverage, often he can't be sure. The risk is to have some unprotected section that sooner or later will lead to some inconsistency or raise an exception, up to the scary possibility that a deadlock occurs. The better way to deal with concurrency is to pick a programming model that avoids problems by costruction, and actors can be a good choice.
 

I've started doing some coding in the area and while I'm still far from a conclusion it's high time I posted some considerations.
 

Actors and Akka
 

So, what are actors about? These are the basic principles:
 

  1. No shared (mutable) data. Each actor manages its own slice of data and there's nothing shared with others. Thus no need for synchronized.
     
  2. Lightweight processes. Threads are still good, since they scale well; but, as per the previous point, they must be isolated.
     
  3. Communicate through Asynchronous Messages. What is traditionally done by invoking a method from an object to another now is done by sending an asynchronous message. The sender is never blocked, and the receiver gets all inbound messages enqueued into a 'mailbox', from which they will be consumed one at a time per each actor instance.
     
  4. Messages are immutable. This should be inferred by the previous points (otherwise bye bye "no shared mutable data"), but it's a good advice to stress the point.
     

With actors the contract of a computational entity is no more represented by an interface (that is an enumeration of methods) and behaviour, but by the set of messages that the actor can receive and send, and behaviour. Copying the first example in the Java API for Akka, a simple actor can be:

import akka.actor.UntypedActor;

import akka.event.EventHandler;
 public class SampleUntypedActor extends UntypedActor

  {

    public void onReceive (Object message) throws Exception

      {

        if (message instanceof String)

          {

                         getContext().replyUnsafe("Hello " + message);

          }

        else

          {

            throw              new IllegalArgumentException("Unknown message: " + message);

          }

      }    

  }

Actors, being managed objects (and possibly distributed on a network), can't be directly accessed, but must be handled by references. For instance, to send a message to the previous actor:

import static akka.actor.Actors.*;

final ActorRef myActor = actorOf(SampleUntypedActor.class);

actor.start();

...

actor.tell("world");

Now Akka has a considerable value in being tested and optimized - performance is one of the reasons for which you might want to use actors and it's not easy to achieve and Akka is known to scale to impressive numbers. But while messages can be delivered in different fashions, unfortunately Akka seems not to be supporting publish & subscribe which is my preferred way to go (I've seen references to extensions and possibly this feature could be introduced in future). This is a first showstopper for me. Furthermore, basic concepts are simple but, as it unfortunately often occurs, things get more complicated as you start digging into details. Also, you note the total lack of syntactic sugar, as the incoming message is a vanilla Object and you need a cascade of if (message instanceof ...) to deal with multiple messages. At least, for the things Akka calls untyped actors: there are typed actors too, which can be even defined again by an interface to mimic method calling, that Akka will manipulate through AOP to transform method invocation in message passing. An approach that I don't like, because I think that in a message-passing software the approach must be evident in source code, not masked under something that pretend to be method invocation; furthermore, method invocation semantics are incompatible with publish & subscribe. So, I'd say that with Akka either you have no sugar or you're going to sink into honey.
 

Anyway, just before publishing this blog post, I've learned that Akka 2.0 is going to have publish & subscribe. Excellent news. In the meantime?
 



Understanding what I want
 

In the meantime, I've started sketching some code on my own, to understand how I'd like to use actors. The idea is to learn things and then try to converge to Akka, eventually with a thin layer of syntactic sugar. There's enough work so far that's worth while a blog post, both for my personal need of writing down things and to discuss with others.
 

Since things must be given a real context I've started thinking of desktop applications, in particular two simple tools that I need: SolidBlue computes the fingerprint of the files contained in a directory (I use it for periodically checking the sanity of my photos and related backups) and blueArgyle blueShades provides a GUI for Argyll, a tool for color management, suited to my needs. Both are being implemented with the NetBeans Platform.
 

So let's focus on SolidBlue and its main feature. The scenario is:
 

  1. You select a directory.
  2. The application scans the directory recursively.
  3. For each discovered file, contents are loaded and the MD5 fingerprint is computed.
  4. Each MD5 computed fingerprint is stored into a flat file.
  5. A UI monitors the workflow displaying the current status on the screen.

The flow can be parallelized. For instance, directory recursive inspection can be performed by multiple threads, as well as the MD5 computation, which only involves the CPU. On the other hand, loading from the disk is something that it's better to serialize (that is, reading only one file at a time) in order to maximize the disk throughput (which it's the true bottleneck of the application). Indeed, things are a bit more complex than I've just said, but I'll be back on the issue later.
 


Primary message flows in SolidBlue
 

Note: in the following code samples, I'm making use of Lombok annotations for some plumbing. This is not only great for everyday's work, but it also makes code samples much easier to read. Also, I'm only listing significant import statements, as well as skipping some non relevant code chunks.
 

Everything starts by sending to the system a message asking for a scan:
 

FileScanRequestMessage.forFolder("/Volume/Media/Photos")

                                  .withFilter(withExtensions("NEF", "JPG", "TIF"))

                                  .send();

As you can see, there's no target actor, but according to the "publish and subscribe" approach the message just sends itself. The listing for FileScanRequestMessage is below:
 

package it.tidalwave.integritychecker;



import it.tidalwave.actor.MessageSupport;

import it.tidalwave.actor.annotation.Message;



@Message @Immutable @RequiredArgsConstructor(access=PRIVATE) @EqualsAndHashCode @ToString(callSuper=false)

public class FileScanRequestMessage extends MessageSupport

  {

    public static interface Filter

      {       

        public boolean accepts (@Nonnull FileObject fileObject); 

      }

     

    @Nonnull @Getter

    private final FileObject folder; 

   

    @Nonnull @Getter

    private final Filter filter; 

   

    @Nonnull

    public static FileScanRequestMessage forFolder (final @Nonnull FileObject fileObject)

      {

        return new FileScanRequestMessage(fileObject, Filter.ANY);

      }   

   

    @Nonnull

    public FileScanRequestMessage withFilter (final @Nonnull Filter filter)

      { 

        return new FileScanRequestMessage(folder, filter); 

      }

  }

Somewhere there's a FileObjectDiscoveryActor that listens to that message:

package it.tidalwave.integritychecker.impl;



import it.tidalwave.actor.annotation.Actor;

import it.tidalwave.actor.annotation.ListensTo;



@Actor @ThreadSafe

public class FileObjectDiscoveryActor

  {

    public void onScanRequested (final @ListensTo FileScanRequestMessage message)

      {

        for (final FileObject child : message.getFolder().getChildren())

          {

            if (message.getFilter().accepts(child))

                         {

                           (child.isFolder() ? FileScanRequestMessage.forFile(child).withFilter(message.getFilter())

                                             : FileDiscoveredMessage.forFile(child)).send();

                         }

          }

      }

  }

You can see here my syntactic sugar: the actor is a POJO and methods designated to receive messages are annotated by @ListensTo. Multiple methods can listen to multiple messages. The annotation is similar to @Observes by CDI (JSR-299) - actually I'd love to reuse it, but it seems to have different semantics (e.g. by default it can activate managed objects that are the target of an inbound message; furthermore there are multiple delivery options related to transactionality that at the moment I'm not interested into).
 

Recursive directory navigation is implemented by sending another FileScanRequestMessage. When a regular file is detected, a FileDiscoveredMessage is fired (there's no need to see its listing, as it's straightforward).
 

Inbound messages are first enqueued and then dispatched in a FIFO fashion (with exceptions explained later). If the actor is not thread safe, there's the guarantee that it can be engaged by a single thread at any time (thus, messages are processed one per time). In this way there's no need to have synchronized sections. If the actor is thread safe (the default), as FileDiscoveredMessage that's even stateless, it can be engaged by multiple threads at the same time. The thread safety property is specified as an attribute of @Actor (unfortunately @ThreadSafe can't help since its retention is compile time).
 

The second actor is the one that reads data from files:
 

 package it.tidalwave.integritychecker.impl;



import it.tidalwave.actor.annotation.Actor;

import it.tidalwave.actor.annotation.ListensTo;



@Actor(initialPriority=Thread.MAX_PRIORITY) @ThreadSafe

public class FileLoaderActor

  {

    public void onFileDiscovered (final @ListensTo FileDiscoveredMessage message)

      {

        final FileObject fileObject = message.getFileObject();



        try

          {

            final File file = FileUtil.toFile(fileObject);

            final @Cleanup RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r");

            final MappedByteBuffer byteBuffer = randomAccessFile.getChannel().map(READ_ONLY, 0, file.length()); 

            byteBuffer.load();

            randomAccessFile.close();

            new FileContentsAvailableMessage(fileObject, byteBuffer).send();

          }

        catch (Exception e)

          {

            FileDamageDetectedMessage.forFile(fileObject).withCause(e).send();

          }

      }

  }

It uses memory-mapped I/O for speed and fires a FileContentsAvailableMessage carrying the bytes, or a FileDamageDetectedMessage in case of error.
 

The next stage is the actor that computes the fingerprint:
 

package it.tidalwave.integritychecker.impl;



import it.tidalwave.actor.annotation.Actor;

import it.tidalwave.actor.annotation.ListensTo;



@Actor @ThreadSafe

public class FingerprintComputerActor

  {

    public void onFileContentsAvailable (final @ListensTo FileContentsAvailableMessage message)

      throws NoSuchAlgorithmException

      {

        final String algorithm = "MD5";

        final MessageDigest digestComputer = MessageDigest.getInstance(algorithm);

        digestComputer.update(message.getContents());

        final Fingerprint fingerprint = new Fingerprint(algorithm, toString(digestComputer.digest()));

        FingerprintComputedMessage.forFile(message.getFileObject()).withFingerprint(fingerprint).send();

      }

  }

Again, a FingerprintComputedMessage message carrying the result is fired. The last stage is the persistence actor:
 

package it.tidalwave.integritychecker.impl;



import it.tidalwave.actor.CollaborationStartedMessage;

import it.tidalwave.actor.MessageSupport;

import it.tidalwave.actor.annotation.Actor;

import it.tidalwave.actor.annotation.ListensTo;

import it.tidalwave.actor.annotation.Message;

import it.tidalwave.actor.annotation.OriginatedBy;



@Actor(threadSafe=false) @NotThreadSafe

public class PersistenceManagerActor

  {

    @Message(outOfBand=true) @ToString

    static class FlushRequestMessage extends MessageSupport

      {

      }

   

    private final Map<String, Object> map = new TreeMap<String, Object>();

   

    private File persistenceFile;

   

    private boolean flushPending;

   

    public void onScanStarted (final @ListensTo CollaborationStartedMessage cMessage,

                                       final @OriginatedBy FileScanRequestMessage message)

      {

        final File folder = FileUtil.toFile(message.getFolder());               

        final String fileName = new SimpleDateFormat("'fingerprints-'yyyyMMdd_HHmm'.txt'").format(new Date());

        persistenceFile = new File(folder, fileName);

        map.clear();

      }

   

    public void onFileDiscovered (final @ListensTo FileDiscoveredMessage message)

      {

        final String name = message.getFileObject().getNameExt();

       

        if (!map.containsKey(name)) // message sequentiality is not guaranteed

          {

            map.put(name, "unavailable"); 

            requestFlush();

          }

      }



    public void onFingerprintComputed (final @ListensTo FingerprintComputedMessage message)

      {

        final String name = message.getFileObject().getNameExt();

        final Fingerprint fingerprint = message.getFingerprint();

        map.put(name, fingerprint);

        requestFlush();

      }

   

    private void onFlushRequested (final @ListensTo FlushRequestMessage message)

      throws InterruptedException, IOException

      {

        flushPending = false;

        // writes data to file

     }

   

    private void requestFlush()

      {

        if (!flushPending)

          {

            new FlushRequestMessage().sendLater(5, TimeUnit.SECONDS);

            flushPending = true;

          }

      }

  }


The basic idea is not to write to disk every piece of data as it's generated, since it might be expensive (well, perhaps writing to a relational database wouldn't be, but for a simple tool such as SolidBlue it makes more sense to write to a flat file). So, data are first put into an in-memory map (a record is generated as soon as a file has been discovered and later overwritten with the final result) that is periodically flushed to disk. So, the actor is stateful, hence it's not thread safe (there must be only one instance). Whenever the map is changed, it's marked 'dirty' and a request to flush it to the disk is generated. The flush will happen within 5 seconds, but it's not a good idea to use a Java Timer as I'd run into the threading problems that I want to avoid. Instead, a private FlushRequestMessage is sent back to itself with some delay, triggering the write to the disk. This message is annotated as 'out-of-band', it means that must delivered with high priority (in practice, it's placed at the head of the incoming message queue rather than the tail). Since the runtime guarantees that only a single message can be processed by any actor instance at any time, there's no need to protect the map, or the flushPending flag, by synchronized sections.

The onScanStarted() method is interesting, because it introduces the concept of collaborations.

 

Collaborations
 

Actually, asynchronicity is excellent for massively concurrent system (as well as, not to say, to model things that are asynchronous in nature, and there are many). But it's nice to know, in some way, that a sequence of things that happened in response to a message of yours has been completed. In my code I've called this concept "Collaboration":
 

  1. Any message is always part of a Collaboration.
  2. If the thread creating a certain message is not bound to a Collaboration, a new Collaboration is created and the message will be considered its originator.
  3. The originator message will be delivered in threads bound to its Collaboration. This means that any further message, created as a consequence of the reception of the originator message, will share its Collaboration.
     
  4. A Collaboration keeps tracks of all the related messages and threads by means of reference counting.
     
  5. When there are no more pending messages or working threads for a Collaboration, it is considered completed.
  6. Two special messages, CollaborationStartedMessage and CollaborationTerminatedMessage, are fired by the runtime to notify of the creation and completion of collaborations.

When a message is sent, you can get its Collaboration object:
 

final Collaboration collaboration = new MyMessage().send();

and any listening method can get the Collaboration as well:
 

public void onMyMessage (final @ListensTo MyMessage message)

  {

    final Collaboration collaboration = message.getCollaboration();

    ...

  }

By construction, thus, a Collaboration is attached to all the threads that are directly or indirectly triggered by the originating message. This could be used for managing regular transactions (for instance, attaching a javax.transaction.UserTransaction to the Collaboration). But actors seem to be better dealing with Software Transactional Memory, so I drop this argument for now.
 

It's even possible to synchronously waiting for a Collaboration to complete:
 

collaboration.waitForCompletion();
 

even though I think it's only useful for tests.  More interesting is the case in which a Collaboration includes bidirectional interactions with a user: for instance, making a dialog box to pop up and wait for a selection.
In this circumstances, a Collaboration can be suspended (this means it is not considered to be terminated even though there are no pending messages or working threads) and later resumed. More details in a further post.
 

Collaborations is something that I don't expect to find in platforms such as Akka, since I'm not sure it's feasible to have them working in an efficient way in a distributed context (I mean: maybe yes, maybe not, I haven't studied the problem yet). But they are a nice feature to have if possible and perhaps it's possible to implement it on top of an existing actor platform.
 

Back to the method onScanStarted(), it is clear now that it listens to the CollaborationStartedMessage messages originated by a FileScanRequestMessage to initialize the storage of the fingerprints, which must be located in the top scanned directory.
 

In a similar way, the IntegrityCheckerPresentationControllerActor listens to the various messages being delivered, including those notifying the beginning and the end of the Collaboration, to update the user interface:
 

package it.tidalwave.integritychecker.ui.impl;



import it.tidalwave.actor.MessageSupport;

import it.tidalwave.actor.Collaboration;

import it.tidalwave.actor.CollaborationCompletedMessage;

import it.tidalwave.actor.CollaborationStartedMessage;

import it.tidalwave.actor.annotation.Actor;

import it.tidalwave.actor.annotation.ListensTo;

import it.tidalwave.actor.annotation.Message;

import it.tidalwave.actor.annotation.OriginatedBy;
@Actor(threadSafe=false) @NotThreadSafe

public class IntegrityCheckerPresentationControllerActor

  {

    private static final double K10 = 1000;

    private static final double M10 = 1000000;

   

    @Message(outOfBand=true, daemon=true) @ToString

    private static class RefreshPresentationMessage extends MessageSupport

      {

      }

                 

    private final IntegrityCheckerPresentationBuilder presentationBuilder = Locator.find(IntegrityCheckerPresentationBuilder.class);

   

    private final IntegrityCheckerPresentation presentation = presentationBuilder.createPresentation();

   

    private final Statistics statistics = new
Statistics
();

   

    private boolean refreshing = false;

   

    private int totalFileCount;

   

    private long totalDataSize;

   

    private int processedFileCount;

   

    private long processedDataSize;

   

    public void onScanStarted (final @ListensTo CollaborationStartedMessage cMessage,

                                     final @OriginatedBy FileScanRequestMessage message)

      {

        totalFileCount = 0;

        totalDataSize = 0;

        processedFileCount = 0;

        processedDataSize = 0;

        refreshing = true;

        new RefreshPresentationMessage().send();

      }

   

    public void onScanCompleted (final @ListensTo CollaborationCompletedMessage cMessage,

                                       final @OriginatedBy FileScanRequestMessage message)

      {

        refreshing = false;

        presentation.setProgressLabel("done");

        new RefreshPresentationMessage().send();

      }

   

    public void onFileDiscovered (final @ListensTo FileDiscoveredMessage message)

      {

        totalFileCount++;

        totalDataSize += message.gnbsp;nbsp;nbsp;nbsp; nbsp; }etFileObject().getSize();

      }

   

    public void onFingerprintComputed (final @ListensTo FingerprintComputedMessage message)

      {

        processedFileCount++;

        processedDataSize += message.getFileObject().getSize();

        final Collaboration collaboration = message.getCollaboration();

        final long elapsedTime = collaboration.getDuration().getMillis();

        final double speed = (processedDataSize / M10) / (elapsedTime / K10);

        final int eta = (int)(((totalDataSize - processedDataSize) / M10) / speed);

        ... // and update the statistics object

      }

   

    private void updatePresentation (final @ListensTo RefreshPresentationMessage message)

      {

        presentation.updateStatistics(statistics);

       

        if (refreshing)

          {

                  new RefreshPresentationMessage().sendLater(1, TimeUnit.SECONDS);

          }

      }

  }


Actually another advantage of message-based designs is that a user interface can be easily updated without having to know a large number of listeners.
 

Activation
 

Actors are managed objects and must be never directly accessed. To accomplish this, they are declared by means of activators, which typically are grouped together such as in:
 

package it.tidalwave.integritychecker.impl;



import org.openide.util.lookup.ServiceProvider;

import it.tidalwave.actor.spi.ActorGroupActivator;

import static it.tidalwave.actor.spi.ActorActivator.*;



@ServiceProvider(service=IntegrityCheckerActivator.class)

public class IntegrityCheckerActivator extends ActorGroupActivator

  {

    public IntegrityCheckerActivator()

      {

        add(activatorFor(FileObjectDiscoveryActor.class).withPoolSize(8));

        add(activatorFor(FileLoaderActor.class).withPoolSize(1));

        add(activatorFor(FingerprintComputerActor.class).withPoolSize(4));

        add(activatorFor(PersistenceManagerActor.class).withPoolSize(1));

      }

  }

Activators specify the pool size of each actor. Given that, it's possible to activate and deactivate the group:

Locator.find(IntegrityCheckerActivator.class).activate();

Locator.find(IntegrityCheckerActivator.class).deactivate();

 

For people not used to the NetBeans Platform, @ServiceProvider is a compile-time annotations that generates a service declaration into META-INF/services, while Locator is a facility that retrieves the reference at runtime.
 

Actors can have methods annotated with @PostConstruct and @PreDestroy, which will be called just before activation and just after deactivation, of course in a thread-safe way. There's the guarantee that messages can't be delivered before activation is completed and after deactivation.
 

Simpler than Akka... for how long?
 

This works, I enjoy the fact that I can avoid synchronized blocks and so far I don't miss the fact that that thing is clearly much less performant than Akka. Everything fits into less than 40k of jar file and it's much simpler than Akka (whose complexity is justified by the fact that it offers many more features). So what? Distributing to a network is something that we can't ignore in a cloud oriented world. So it's likely that going on I'll see the need for more features: often you discover that you need new features as you go on. Smell of reinventing the wheel ahead!
 

For instance, consider the FileLoadActor. I've previously said that experimental evidence demonstrates that it makes sense (at least with large files) to have it instantiated in a single instance, thus serializing data access to the disk. This makes sense given the physical nature of a magnetic disk. But this happens with a single disk: if you have more than one, it probably makes sense to have one instance per disk. Having a Solid State Drive (SSD) changes the rules. This means that you must have some more complex way to dispatch messages, a form of routing. Akka offers it, and it would be unwise to rewrite this feature from scratch.
 

So now I'm going to start phase two, that is to reimplement this stuff upon Akka (using the 2.0 milestone that's already available). In a few weeks I'll let you know how the story goes on.

The code that I described in this blog post are available at the Mercurial repository http://bitbucket.org/tidalwave/solidblue-src, tag 1.0-ALPHA-4.
 

In a further post I'll talk about how Collaborations are used by blueArgyll, which features more complex interactions with the user. See you later.

AttachmentSize
SolidBlueActors.png189.83 KB
Related Topics >>

Comments

Hmmm... Is it just me or does this whole deal look a lot ...

Hmmm... Is it just me or does this whole deal look a lot like Autonomous Agents paradigm? As the name suggests, they're autonomous (i.e. no state sharing etc), communicate through async messages and so on. While, compared to Actors, Agents have more mandatory attributes as their definition is semantically richer, but they do resemble Actots in many regards. If interested, check out JADE (Java Agent Development Environment). I'd really love if someone could confirm these two look very similar, or alternatively, if I'm on the wrong track here.

There are similarities - I&nbsp;remember having worked for ...

There are similarities - I remember having worked for some time with mobile agents (Aglets) at the times of university. As you said, Agents are semantical richer, but above all I see a difference in the usage and purpose. As my simple example in this blog demonstrates, you use Actors as a concurrency paradigm, to decompose an algorithm in parts and make your life easier with threading. Thus you can decompose into Actors at fine grain - not by chance Akka has been optimized to efficiently run even millions of Actor instances. I woulnd't see a decomposition into Agents for my problem of computing fingerprints (while the whole function could be modeled as a single Agent). Furthermore, I think that the "autonomous" attribute makes another difference: an Agent can act on its own, without necessarily reacting to an external message, while as far as I understand Actors only do something after getting a stimulus.

BTW, in any case if I correctly remember JADE is made by Telecom Italia, again a communication company.