Skip to main content

Deque shredder

Posted by evanx on August 1, 2012 at 1:21 PM PDT

Last time we introduced the trivial namesake Timestamped interface, and used the excellent ArrayDeque of Java6 to collect such things, imposing a time-based capacity and some external synchronization. Now let's test this with some threads.

 Deque shredder: A part of "Timestamped: a trilogy in a few parts."

In case you missed the update to our previous installment, our generic TimestampedElement now has a compareTo() for natural ordering by the timestamp.

public class TimestampedElement implements Timestamped, Comparable {
    T element;
    long timestamp;

    public TimestampedElement(T element, long timestamp) {
        this.element = element;
        this.timestamp = timestamp;
    }

    public T getElement() {
        return element;
    }
   
    @Override
    public long getTimestamp() {
        return timestamp;
    }

    @Override
    public int compareTo(Timestamped other) {
        if (timestamp < other.getTimestamp()) return -1;
        if (timestamp > other.getTimestamp()) return 1;
        else return 0;
    }   
}

For completeness herewithin, our deque collector for our Timestamped thingies, follows.

public class TimestampedDequer  {
    long capacityMillis;
    long lastTimestamp;
    ArrayDeque deque = new ArrayDeque();
   
    public TimestampedDequer(long capacityMillis) {
        this.capacityMillis = capacityMillis;
    }
   
    public synchronized void addLast(T element) {
        if (element.getTimestamp() == 0 || element.getTimestamp() < lastTimestamp) {
            deque.clear(); // throw our toys out the cot exception
        } else {
            lastTimestamp = element.getTimestamp();
            prune(lastTimestamp);
            deque.addLast(element);
        }
    }

    private void prune(long latestTimestamp) {
        while (deque.size() > 0 &&
                deque.getFirst().getTimestamp() <= latestTimestamp - capacityMillis) {
            deque.removeFirst();
        }
    }

    public synchronized Deque snapshot(long lastTimestamp) {
        prune(lastTimestamp);
        return deque.clone();
    }

    public synchronized Deque tail(int size) {
        Deque tail = new ArrayDeque();
        Iterator it = deque.descendingIterator();
        for (int i = 0; i < size && it.hasNext(); i++) {
            tail.addFirst(it.next());
        }
        return tail;
    }   
}


where we use the efficient ArrayDeque implementation of Java6.

As discussed last time, we remove expired elements from the head when we add the latest element to the tail, to make it self-pruning. And we provide a sychronized snapshot() and tail() for a couple of use-cases as follows...

Decisively, we will use snapshot() to analyse the latest records for the desired interval e.g. for an automated status check every minute.

Furthermore, we will use a size-based tail() e.g. to display the latest so-many records in a status web page, for informational purposes.

Now let's do us some "heavy-dropping" with threads, using a ScheduledExecutorService to regularly add records to the deque.

public class TimestampedDequerTest  {
    long capacityMillis = 90;
    long scheduledInterval = 10;
    long scheduledDelay = 0;
    final TimestampedDequer dequer = new TimestampedDequer(capacityMillis);
    boolean verbose = false;
   
    Runnable scheduledRunnable = new Runnable() {

        @Override
        public void run() {
            addLast();
        }
    };

    private void addLast() {
        long timestamp = System.currentTimeMillis();
        String value = "record at " + timestamp;
        dequer.addLast(new TimestampedElement(value, timestamp));
        if (verbose) {
            System.out.println(value);
        }
    }

    @Test
    public void testConcurrently() throws Exception {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(capacity);
        ScheduledFuture future = executorService.scheduleAtFixedRate(
                    scheduledRunnable, scheduledDelay, scheduledInterval, TimeUnit.MILLISECONDS);
        checkConcurrently();
        checkConcurrently();
        future.cancel(true);
    }

where we check twice, just to make sure! ;) Actually we want to make sure of the prune()'ing, following the sleep for capacityMillis.
    
    private void checkConcurrently() throws Exception {
        long startMillis = System.currentTimeMillis();
        System.out.println("startMillis " + startMillis);
        verbose = true;
        Thread.sleep(capacityMillis);
        int expectedCapacity = (int) (capacityMillis / scheduledInterval);
        verbose = false;
        long stopMillis = System.currentTimeMillis();
        System.out.println("stopMillis " + stopMillis);
        Deque deque = dequer.snapshot(stopMillis);
        long firstTimestamp = deque.getFirst().getTimestamp();  
        long lastTimestamp = deque.getLast().getTimestamp();  
        System.out.println("size " + deque.size());
        System.out.println("first " + firstTimestamp);
        System.out.println("last " + lastTimestamp);
        Assert.assertTrue("first time", firstTimestamp >= startMillis);       
        Assert.assertTrue("last time", lastTimestamp >= firstTimestamp);
        Assert.assertTrue("capacityMillis min", lastTimestamp - firstTimestamp >= 0);       
        Assert.assertTrue("capacityMillis max", lastTimestamp - firstTimestamp <= capacityMillis);       
        Assert.assertTrue("size min", deque.size() > 0);
        Assert.assertTrue("size max", deque.size() <= expectedCapacity);
        checkSet(deque);
    }   

which prints...
scheduledInterval 10
record at 1340231378158
record at 1340231378168
record at 1340231378178
...
record at 1340231378228
record at 1340231378238
startMillis 1340231378157
stopMillis 1340231378247
size 9
first 1340231378158
last 1340231378238
...
startMillis 1340231378249
stopMillis 1340231378339
size 9
first 1340231378258
last 1340231378338

We survey this output, eyeing the timestamps, and nod ponderously.

Just for good measure, we add the records to a SortedSet, and check that the first and last timestamps match.

    private void checkSet(Deque deque) throws Exception {
        SortedSet set = new TreeSet();
        set.addAll(deque);       
        Assert.assertEquals("size", deque.size(), set.size());
        Assert.assertEquals("first", deque.getFirst().getTimestamp(), set.first().getTimestamp());
        Assert.assertEquals("last", deque.getLast().getTimestamp(), set.last().getTimestamp());
    }

where our TimestampedElement's compareTo() method enables the natural ordering, and forgoes having to use our TimestampedComparator to construct the SortedSet.

Let's vary the scheduledInterval.

    @Test
    public void testScheduledIntervals() throws Exception {
        while (--scheduledInterval > 0) {
            ScheduledFuture future = Executors.newScheduledThreadPool(10).scheduleAtFixedRate(
                    scheduledRunnable, scheduledDelay, scheduledInterval, TimeUnit.MILLISECONDS);
            Thread.sleep(capacityMillis);
            int expectedCapacity = (int) (capacityMillis / scheduledInterval);
            long stopMillis = System.currentTimeMillis();
            Deque deque = dequer.snapshot(stopMillis);
            Woohoo.assertEquals("interval " + scheduledInterval, expectedCapacity, deque.size());
            future.cancel(true);
            Thread.sleep(scheduledInterval);
        }
    }

where we loop the scheduledInterval down to 1ms.
interval 9: Woohoo! 10 == 10
interval 8: Woohoo! 11 == 11
interval 7: Woohoo! 12 == 12
interval 6: D'oh! 15 != 14
interval 5: D'oh! 18 != 17
interval 4: Woohoo! 22 == 22
interval 3: D'oh! 30 != 29
interval 2: D'oh! 45 != 44
interval 1: D'oh! 90 != 89

Given how unpredictable time is, ironically, with those threads and what-not, we can't exactly predict the size of the list. D'oh! So for that we have used the following util class to see if the size is more or less what we expect...
public class Woohoo {

    public static void assertEquals(String message, Object expected, Object actual) {
        if (actual.equals(expected)) {
            System.out.printf("%s: Woohoo! %s == %s\n", message, expected, actual);
        } else {
            System.out.printf("%s: D'oh! %s != %s\n", message, expected, actual);
        }
    }

Selectively using the above drop-in replacement for Assert, we get our tests to pass 100%, woohoo! ;)

To further increase the intensity, we allow the scheduled threads to coalesce.

    @Test
    public void testCumulativeScheduledIntervals() throws Exception {
        Deque futures = new ArrayDeque();
        int expectedCapacity = 0;
        while (--scheduledInterval > 0) {
            expectedCapacity += (int) (capacityMillis / scheduledInterval);
            ScheduledFuture future = Executors.newScheduledThreadPool(100).scheduleAtFixedRate(
                    scheduledRunnable, scheduledDelay, scheduledInterval, TimeUnit.MILLISECONDS);
            futures.add(future);
            Threads.sleep(capacityMillis);
            long stopMillis = System.currentTimeMillis();
            Deque deque = dequer.snapshot(stopMillis);
            Woohoo.assertEquals("interval " + scheduledInterval, expectedCapacity, deque.size());
            Threads.sleep(scheduledInterval);
        }
        for (ScheduledFuture future : futures) {
            future.cancel(true);
        }
    }

where we only cancel all the scheduled threads when all is said and done. As expected, we get some D'oh's and Woohoo's!
interval 9: Woohoo! 10 == 10
interval 8: Woohoo! 21 == 21
interval 7: D'oh! 33 != 34
interval 6: Woohoo! 48 == 48
interval 5: Woohoo! 66 == 66
interval 4: D'oh! 88 != 89
interval 3: Woohoo! 118 == 118
interval 2: D'oh! 163 != 165
interval 1: D'oh! 253 != 254

Let's wrap this up by asserting that our Deque collector is reasonably robust!?

Next time, we'll use our timestamped deque to capture log4j records, and as alluded to previously, regularly analyse the latest deque of logs to detect when our app's wheels are wobbling or even coming off altogether, and notify ourselves thereof.

Resources

https://github.com/evanx/vellum/wiki

Related Topics >>