Skip to main content

Timestamped Deque

Posted by evanx on June 27, 2012 at 3:33 PM PDT

In this here second part of the "Timestamped" series, we introduce its namesake Timestamped interface, and use a Deque from Java6 to collect such things, and impose a time-based capacity for that.

Ultimately we gonna hook up a remote Log4j appender, to digest logs in order to provide salient information via RMX and HTTP, and notify ourselves via Gtalk when the wheels are wobbling or even coming off altogether. What fun! But lemme not get ahead of myself.

Prequels

Incidently we kicked off this no-hit wonder series with Counter Map, which does not feature further yet, so ignore that for now.

 Timestamped Deque: A part of "Timestamped: a trilogy in a few parts."

Without further ado, I give you the namesake interface of this series.

public interface Timestamped {
    public long getTimestamp();   
}

which returns the timestamp in "millis" ala System.currentTimeMillis().

Also take an adapter for Log4j's LoggingEvent.

public class TimestampedLoggingEventAdapter implements Timestamped {
    LoggingEvent loggingEvent;

    public TimestampedLoggingEventAdapter(LoggingEvent loggingEvent) {
        this.loggingEvent = loggingEvent;
    }

    @Override
    public long getTimestamp() {
        return loggingEvent.getTimeStamp();
    }
}

And a generic wrapped element.
public class TimestampedElement implements Timestamped, Comparable {
    T element;
    long timestamp;

    public TimestampedElement(T element, long timestamp) {
        this.element = element;
        this.timestamp = timestamp;
    }

    public T getElement() {
        return element;
    }
   
    @Override
    public long getTimestamp() {
        return timestamp;
    }

    @Override
    public int compareTo(Timestamped other) {
        if (timestamp < other.getTimestamp()) return -1;
        if (timestamp > other.getTimestamp()) return 1;
        else return 0;
    }   
}

where we implement compareTo() for natural ordering by the timestamp.

Since duplicate timestamps are possible i.e. where two or more events occur at the same millisecond, and indeed duplicate log messages at the same time, we forgo implementing hashCode() and equals(). Imagine we add such elements to a Set, whose javadoc describes it thus:

Set - A collection that contains no duplicate elements. More formally, sets contain no pair of elements e1 and e2 such that e1.equals(e)

Therefore we defer to the default hashCode() and equals() from Object, which are based on object address reference.

We might construct a SortedSet of Timestamped elements.

SortedSet - The elements are ordered using their natural ordering, or by a Comparator typically provided at sorted set creation time.

So if we have not implemented compareTo(), then we will need a comparator.

public class TimestampedComparator implements Comparator {

    @Override
    public int compare(Timestamped o1, Timestamped o2) {
        if (o1.getTimestamp() < o2.getTimestamp()) return -1;
        if (o1.getTimestamp() > o2.getTimestamp()) return 1;
        else return 0;
    }   
}


Our inclination might be collect Timestamped elements in a List, or a Queue perhaps.

Queue - A collection designed for holding elements prior to processing.

That sounds rather appropriate for our digestive purposes, to find the ghost in the machine.

Deque collector

So let's introduce the namesake of this article, a collector of timestamped thingies - a circular buffer, some might call it - and impose a time-based capacity thereupon.

So we use the java.util.Deque found in Java 1.6, courtesy of those most excellent gentlemen, Doug Lea and Josh Bloch. Its javadoc describes it thus:

Deque - A linear collection that supports element insertion and removal at both ends. The name deque is short for "double ended queue" and is usually pronounced "deck."

We use the efficient ArrayDeque implementation.

ArrayDeque - This class is likely to be faster than Stack when used as a stack, and faster than LinkedList when used as a queue.

Fantastic. Let's get us some of that.

public class TimestampedDequer  {
    long capacityMillis;
    long lastTimestamp;
    ArrayDeque deque = new ArrayDeque();
   
    public TimestampedDequer(long capacityMillis) {
        this.capacityMillis = capacityMillis;
    }
   
    public synchronized void addLast(T element) {
        if (element.getTimestamp() == 0 || element.getTimestamp() < lastTimestamp) {
            deque.clear(); // throw our toys out the cot exception
        } else {
            lastTimestamp = element.getTimestamp();
            prune(lastTimestamp);
            deque.addLast(element);
        }
    }

    private void prune(long lastTimestamp) {
        while (deque.size() > 0 &&
                deque.getFirst().getTimestamp() <= lastTimestamp - capacityMillis) {
            deque.removeFirst();
        }
    }


where we compose an ArrayDeque and synchronize it "externally" for our purposes, considering that it will be digesting log records continually, whilst being under interrogation by RMX, HTTP requests and what-not.

When we add an element onto the tail, we prune() expired elements from the head. Observe that the above implementation assumes that elements are added in chronological order. However we expect our host's time to be adjusted by NTP occassionally - hence we clear() i.e. when we encounter an eventuality that we don't play nicely with.

If we are digesting logs from multiple servers or what-have-you, the above so-called "dequer" aint gonna work, baby - it's gonna come up empty, baby. Don't shuffle this deque, baby. (As Elvis might have said.) We'll deal with such a handful another time.

Now, in order to analyse the contents for the desired interval, we take a snapshot().

    public synchronized Deque snapshot(long lastTimestamp) {
        prune(lastTimestamp);
        return deque.clone();
    }

which returns a defensive copy, and so is a relatively costly operation. Perhaps you could recommend an alternative strategy? Perhaps we could implement a special concurrent deque implementation in a future episode, as an exercise? Taking inspiration from that Disruptor thingymajig, perchance, as well as ArrayDeque itself? One that supports aggregating from multiple servers, methinks.

Another use-case is to get the tail i.e. the latest so-many elements, for informational purposes e.g. to display via a servlet, or attach to an email alert.

    public synchronized Deque tail(int size) {
        Deque tail = new ArrayDeque();
        Iterator it = deque.descendingIterator();
        for (int i = 0; i < size && it.hasNext(); i++) {
            tail.addFirst(it.next());
        }
        return tail;
    }   

where we use descendingIterator() to read from the tail of the deque, and addFirst() to rectify the order.

Let's test this thing.

public class TimestampedDequerTest  {
    TimestampedDequer dequer = new TimestampedDequer(capacityMillis);
    boolean verbose = false;
   
    private void addLast() {
        long timestamp = System.currentTimeMillis();
        String value = "record at " + timestamp;
        dequer.addLast(new TimestampedElement(value, timestamp));
        if (verbose) {
            System.out.println(value);
        }
    }

    @Test
    public void test() throws Exception {
        check();
        check();
    }

where we check() twice... just to make sure. Of prune(), that is.
    private void check() throws Exception {
        Thread.sleep(capacityMillis);
        Assert.assertEquals(0, dequer.snapshot(System.currentTimeMillis()).size());
        Assert.assertEquals(0, dequer.tail(4).size());
        addLast();
        Assert.assertEquals(1, dequer.tail(4).size());
        Assert.assertEquals(1, dequer.snapshot(System.currentTimeMillis()).size());
        Thread.sleep(capacityMillis/2);
        Assert.assertEquals(1, dequer.snapshot(System.currentTimeMillis()).size());
        Assert.assertEquals(1, dequer.tail(4).size());
        addLast();
        Assert.assertEquals(2, dequer.tail(4).size());
        Assert.assertEquals(2, dequer.snapshot(System.currentTimeMillis()).size());
        Thread.sleep(capacityMillis/2);
        Assert.assertEquals(2, dequer.tail(4).size());
        Assert.assertEquals(1, dequer.snapshot(System.currentTimeMillis()).size());
        Assert.assertEquals(1, dequer.tail(4).size());
    }


where expect the final snapshot() to loose an element to prune()'ing, given the two half capacityMillis sleeps since the first addList().

Considering that the purpose of this Timestamped series is reducing information overload, we'll tail off here for now, and leave the "heavy-dropping" for next week, namely, testing with threads.

Thereafter, we'll see about using our TimestampedDequer to analyse the latest deque of logs e.g. every minute, to detect when our app might be coming down like a house of cards.

Resources

https://github.com/evanx/vellum/wiki

Credits

Thanks to my colleague Zach Visagie at BizSwitch.net, for his kind reviews and indispensible input!

Related Topics >>

Comments

Cool, maybe you should contribute to JSR 310, the ...

Cool, maybe you should contribute to JSR 310, the Timestamped interface looks like a nice and lean approach to some of the issues 310 is suffering from.