Skip to main content

Decoupling event producers and event consumers in Java EE 6 using CDI and JMS

Posted by jjviana on April 14, 2010 at 5:26 AM PDT

In this post I will share my recent findings about Container Dependency Injection in Java EE 6, in particular how to decouple the processing threads of  event producers and event consumers.

Java EE 6 introduces a very nice dependency injection framework (CDI) that has superb support for the Observer pattern in the form of event broadcasting.

An Event in CDI is just a regular POJO:

public class MyEvent {
    String data;
    Date eventTime;
    ....

}

Events can be fired by any class through the use of the Event implementation injected automatically by the container via the @Inject annotation:

 

public class EventProducer {

  @Inject @Any Event<MyEvent> event;


  public void doSomething() {
       MyEvent e=new MyEvent();
      e.data="This is a test event";
      e.eventTime=new Date();
      event.fire(e);
        
  }
  
}

Observing events is even easier, one just needs to declare a method which takes a parameter with the @Observes annotation:

 

public class EventConsumer {


    public void afterMyEvent(@Observes MyEvent event) {

        // .. Insert event logic here
    }

}

CDI will automagically wire EventProducer and EventConsumer, so that when EventProducer fires the event, EventConsumer.afterMyEvent gets called.

This is pretty cool, as now EventProducer and EventConsumer can work together without direct knowledge of each other.

There is however another, more subtle, form of coupling taking place:  event dispatching usually happens in the same thread that fired the event. Event observers can perform some long-running tasks upon receiving an event, which will block this thread. This may or may not be a problem, depending on where the event proucer thread is located. If this thread is for instance a GUI event thread or a HTTP request processing thread then the end user may be forced to wait for a response from the application while a slow operation is taking place.

To solve this problem it would be nice to be able to process the events in a backhround thread. Unfortunately, the current CDI specification lacks such a feature.

There are many ways to work around this problem. The one I will present here involves creating a generic background event processing framework that can be used to process any application event in background using JMS as a dispatch mechanism. The Idea is to change the event processing wokflow from:

Event Producer -> Event Consumer(s) [foreground thread]

To:

Event Producer -> Event Sender -> JMS Queue [foreground thread]
JMS Queue -> Event Dispatcher -> Event Consumer [background thread]

Here is how you can do it in Java EE 6:

Create a base event class (and make your application events extend this class)

This class is needed to make sure all events implement the Serializable interface (needed later for JMS serialization) and also to allow the background processing framework to work for any of the application event subclasses:

public class BaseEvent implements java.io.Serializable {

}

Create two event qualifiers: @InForeground and @InBackground

Event qualifiers in CDI are attributes that can be used to filter events delivered to observer methods. We will create two qualifiers: @InForeground to qualify events that are being fired by a "foreground"  thread (i.e a thread that cannot tolerate slow operations) and @InBackground to enable methods to observe only events that are in a "background"  thread:

@Qualifier
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD, ElementType.FIELD, ElementType.PARAMETER, ElementType.TYPE})
public @interface InForeground {

}

@Qualifier
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD, ElementType.FIELD, ElementType.PARAMETER, ElementType.TYPE})
public @interface InBackground {

}

Create a background processing JMS Queue

This is a normal JMS queue that will be used for dispatching events in background. See this excellent post to learn how to do it in Glassfish V3. In my case I have created a JMS queue with the name "jms/BackgroundEventQueue".

Create the background event sender and background event dispatcher

Here is where the real action happens: the event sender will observe foreground events and convert them into JMS messages placed in the background event queue. The event dispatcher will then listen for events posted in this queue, and re-fire them using the @InBackground qualifier so that the event can then be processed by interested observers:

 

@Stateless
public class BackgroundEventSender {
    @Resource(mappedName="jms/ConnectionFactory")
    private ConnectionFactory connectionFactory;
    @Resource(mappedName="jms/BackgroundEventQueue")
    private Queue backgroundEventQueue;
    private Connection connection;
    private Session session;
    private MessageProducer producer;

    @PostConstruct
    public void init() {
        try {
            connection = connectionFactory.createConnection();
            session=connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            producer=session.createProducer(backgroundEventQueue);
        } catch (JMSException ex) {
            Logger.getLogger(BackgroundEventSender.class.getName()).log(Level.SEVERE, null, ex);
            throw new RuntimeException(ex);
        }
    }

    @PreDestroy
    public void destroy() {
        try {

        if(connection!=null)
            connection.close();
          }
        catch(Exception e) {
             Logger.getLogger(BackgroundEventSender.class.getName()).log(Level.SEVERE, null, e);
        }
 
    }
  

    public void event(@Observes @InForeground BaseEvent event) {
        try {
            ObjectMessage msg = session.createObjectMessage();
            msg.setObject(event);
            producer.send(msg);
        } catch (JMSException ex) {
            Logger.getLogger(BackgroundEventSender.class.getName()).log(Level.SEVERE, null, ex);
            throw new RuntimeException(ex);
        }

    }

}

 

 @MessageDriven(mappedName = "jms/BackgroundEventQueue", activationConfig =  {
        @ActivationConfigProperty(propertyName = "acknowledgeMode",
          propertyValue = "Auto-acknowledge"),
        @ActivationConfigProperty(propertyName = "destinationType",
            propertyValue = "javax.jms.Queue")
    })
public class BackgroundEventDispatcher implements MessageListener {
   
    public BackgroundEventDispatcher() {
    }

     @Inject @InBackground Event<BaseEvent> baseEvent;

    public void onMessage(Message message) {

        if(!(message instanceof ObjectMessage))
            throw new RuntimeException("Invalid message type received");

        ObjectMessage msg=(ObjectMessage)message;
        try {
            Serializable eventObject = msg.getObject();
            if(!(eventObject instanceof BaseEvent))
                throw new RuntimeException("Unknown event type received");
            BaseEvent event=(BaseEvent)eventObject;
            baseEvent.fire(event);

        } catch (JMSException ex) {
         Logger.getLogger(BackgroundEventDispatcher.class.getName()).log(Level.SEVERE, null, ex);
         throw new RuntimeException(ex);
        }



    }
   
}

 Use @InForeground and @InBackground in your event producers and consumers

Now we can use the @InForeground event qualifier in the event producer to indicate that this event is being fired by a foreground thread:

public class EventProducer {

  @Inject @Inforeground Event<MyEvent> event;


  public void doSomething() {
       MyEvent e=new MyEvent();
      e.data="This is a test event";
      e.eventTime=new Date();
      event.fire(e);
        
  }
  
}

 

And in the event observer we can listen for events that are being fired in the background dispatcher thread:

public class EventConsumer {


    public void afterMyEvent(@Observes @InBackground MyEvent event) {

        // .. Insert event logic here
    }

}

 

Now EventProducer and EventConsumer are properly decoupled.

A warning about Glassfish v3.0

Working with CDI in Glassfish V3.0 revealed a number of performance problems related to application redeployment. I suggest you use the promoted build of Glassfish 3.0.1 if you want to work with CDI on Glassfish.

 

Comments

Very nice. For an alternative solution, how about create a ...

Very nice.
For an alternative solution, how about create a portable extension like the following:

public class AsyncEventExtension implements Extension {
    private static final Logger     logger         = Logger.getLogger(AsyncEventExtension.class);
    private List<ObserverMethod<?>> asyncObservers = new ArrayList<ObserverMethod<?>>();
    private ExecutorService         pool           = Executors.newCachedThreadPool();

    public <X> void onProcessAnnotatedType(@Observes ProcessAnnotatedType<X> event, final BeanManager beanManager) {
        final AnnotatedType<X> type = event.getAnnotatedType();
        for (AnnotatedMethod<?> method : type.getMethods()) {
            for (final AnnotatedParameter<?> param : method.getParameters()) {
                if (param.isAnnotationPresent(Observes.class) && param.isAnnotationPresent(Async.class)) {
                    asyncObservers.add(SimpleObserverMethod.create(this.pool, beanManager, type, method, param));
                }
            }
        }
    }

    public void onAfterBeanDiscovery(@Observes AfterBeanDiscovery event) {
        for (ObserverMethod<?> om : this.asyncObservers) {
            event.addObserverMethod(om);
        }
    }

    public void onBeforeShutdown(@Observes BeforeShutdown event) {
        this.pool.shutdown();
    }

    private static class SimpleObserverMethod<T> implements ObserverMethod<T> {
        private ExecutorService       pool;
        private BeanManager           bm;
        private AnnotatedType<?>      type;
        private AnnotatedMethod<?>    method;
        private AnnotatedParameter<?> param;
        private Set<Annotation>       qualifiers;
        private Reception             reception;
        private TransactionPhase      txnPhase;

        private SimpleObserverMethod() {
        }

        public static <T> ObserverMethod<T> create(ExecutorService pool, BeanManager bm, AnnotatedType<?> type, AnnotatedMethod<?> method, AnnotatedParameter<?> param) {
            SimpleObserverMethod<T> som = new SimpleObserverMethod<T>();
            som.pool = pool;
            som.bm = bm;
            som.type = type;
            som.method = method;
            som.param = param;
            som.qualifiers = new HashSet<Annotation>();
            for (Annotation annotation : param.getAnnotations()) {
                if (bm.isQualifier(annotation.getClass()) && !annotation.annotationType().equals(Async.class)) {
                    som.qualifiers.add(annotation);
                    if (annotation.annotationType().equals(Observes.class)) {
                        Observes observes = (Observes) annotation;
                        som.reception = observes.notifyObserver();
                        som.txnPhase = observes.during();
                    }
                }
            }
            return som;
        }

        public Class<?> getBeanClass() {
            return this.type.getJavaClass();
        }

        public Type getObservedType() {
            return this.param.getBaseType();
        }

        public Set<Annotation> getObservedQualifiers() {
            return this.qualifiers;
        }

        public Reception getReception() {
            return this.reception;
        }

        public TransactionPhase getTransactionPhase() {
            return this.txnPhase;
        }

        public void notify(T event) {
            if (this.method.isStatic()) {
                this.notify(null, method.getJavaMember(), event);
            } else {
                for (Bean<?> bean : this.bm.getBeans(this.type.getBaseType())) {
                    CreationalContext<?> ctx = this.bm.createCreationalContext(bean);
                    Object target = this.bm.getReference(bean, this.type.getBaseType(), ctx);
                    this.notify(target, method.getJavaMember(), event);
                }
            }
        }

        private void notify(final Object target, final Method method, final T event) {
            this.pool.execute(new Runnable() {
                public void run() {
                    try {
                        method.invoke(target, event);
                    } catch (Exception e) {
                        logger.error("Exception while dispatching event:", e);
                    }
                }
            });
        }
    }
}

and a qualifier as the following:
@Qualifier
@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.PARAMETER })
public @interface Async {
}

Then, you can simple write an async event observer method as the following:

public void onAuditEvent(@Observes @Async AuditEvent event) {
        logger.info("received audit event. msg:" + event.getMsg() + ", time:" + event.getTime());
    }

And the event firing could be written like the following:
...
@Inject
    private Event<AuditEvent> auditEvent;
...
public void fireEvent() {
  this.auditEvent.fire(new AuditEvent("some audit message"));
{

Please note that the above event injection does not have @Async.

Decoupling event producers

Hi
Nice article.
I find the tiltle a little bit confusing, "Decoupling event producers and event consumers in Java EE 6...", in the JEE5 the producer was also decupled from producer so there is no hightligt here. The real improvement is the CDI (with qualifiers) and the (new) Event API.
Regards,
Mihai

Hi, This reply is way too late, but better late than never ...

Hi,

This reply is way too late, but better late than never :)

The decoupling I am refering to is thread decoupling - using the technique described, producers and consumers run in different threads.

Regards,

  - Juliano

Hi, Unfortunately CDI(

Hi,
Unfortunately CDI( atleast 1.0) do not support Typed Parameter in Event.
Don't you think it should?

Thanks & Regards,
Puspendu Banerjee

I can't help but wonder why

I can't help but wonder why you chose to violate encapsulation in your Event class: String data; Date eventTime; Is this for performance reasons? or preferred/required for the Event mechanism?

What about @Asynchronous EJB?

I think you get the same results (Decoupling threads) by putting the marking the Observer method as Asynchronous like this:
public class EventConsumer {

@Asynchronous
public void afterMyEvent(@Observes MyEvent event) {

// .. Insert event logic here
}

}

The @Asynchronous annotation

The @Asynchronous annotation kind of does the same thing but it does not define a retry stratrgy in case of failures like JMS does. In cases where retry is not important then it is a better alternative than tne method described here.

Great post

Really helped me create some functionallity that I needed (thought Events would cover this, but did not because producers and observers are not properly decoupled). Still think JMS is a bit of a big overhead to accomplish this, and hope that Events will be properly decoupled in the future of CDI

Hi, I went for JMS because it

Hi,

I went for JMS because it already contains a lot of the logic for retrying and ordering that I needed.

But the same concept can be easily implemented using a background thread or a thread pool.

I'm sure future versions of CDI will have support for this out of the box.

 

Seam 3

Great article! This is exactly the kind of thing we're building into Seam 3. Have a look at http://sfwk.org/Seam3/JMSModule !

There is no "JEE"

Hi Juliano. Just a reminder that there is nothing named "JEE". The correct name is "Java EE". If that's too much to say or type, just use "EE". Thanks.



Bill Shannon
Java EE Spec Lead



http://www.java.com/en/about/brand/naming.jsp
http://www.theserverside.com/news/thread.tss?thread_id=35561

Hi Bill, Thank for the tip, I

Hi Bill,

Thank for the tip, I corrected it in the article.