Minborg

Minborg
Minborg

Thursday, December 2, 2021

Java: Creating Terabyte Sized Queues with Low-Latency

Queues are often fundamental components in software design patterns. But, what if there are millions of messages received every second and multi-process consumers need to be able to read the complete ledger of all messages? Java can only hold so much information before the heap becomes a limiting factor with high-impacting garbage collections as a result, potentially preventing us from fulfilling targeted SLAs or even halting the JVM for seconds or even minutes.


This article covers how to create huge persisted queues while retaining predictable and consistent low latency using open-source Chronicle Queue.

The Application

In this article, the objective is to maintain a queue of objects from market data feeds (e.g. the latest price for securities traded on an exchange). Other business areas such as sensory input from IOT devices or reading crash-recording information within the automotive industry could have been chosen as well. The principle is the same.


To start with, a class holding market data is defined:


public class MarketData extends SelfDescribingMarshallable {

    int securityId;

    long time;

    float last;

    float high;

    float low;


    // Getters and setters not shown for brevity


}


Note: In real-world scenarios, great care must be taken when using float and double for holding monetary values as this could otherwise cause rounding problems [Bloch18, Item 60]. However, in this introductory article, I want to keep things simple.


There is also a small utility function MarketDataUtil::create that will create and return a new random MarketData object when invoked:


static MarketData create() {

    MarketData marketData = new MarketData();

    int id = ThreadLocalRandom.current().nextInt(1000);

    marketData.setSecurityId(id);

    float nextFloat = ThreadLocalRandom.current().nextFloat();

    float last = 20 + 100 * nextFloat;


    marketData.setLast(last);

    marketData.setHigh(last * 1.1f);

    marketData.setLow(last * 0.9f);

    marketData.setTime(System.currentTimeMillis());


    return marketData;

}


Now, the objective is to create a queue that is durable, concurrent, low-latency, accessible from several processes and that can hold billions of objects.

The Naïve Approach

Armed with these classes, the naïve approach of using a ConcurrentLinkedQueue can be explored:


public static void main(String[] args) {

    final Queue<MarketData> queue = new ConcurrentLinkedQueue<>();

    for (long i = 0; i < 1e9; i++) {

        queue.add(MarketDataUtil.create());

    }

}


This will fail for several reasons:


  1. The ConcurrentLinkedQueue will create a wrapping Node for each element added to the queue. This will effectively double the number of objects created..

  2. Objects are placed on the Java heap, contributing to heap memory pressure and garbage collection problems. On my machine, this led to my entire JVM becoming unresponsive and the only way forward was to kill it forcibly using “kill -9”.

  3. The queue cannot be read from other processes (i.e. other JVMs).

  4. Once the JVM terminates, the content of the queue is lost. Hence, the queue is not durable.


Looking at various other standard Java classes, it can be concluded that there is no support for large persisted queues.

Using Chronicle Queue

Chronicle Queue is an open-source library and is designed to meet the requirements set forth above.  Here is one way to set it up and use it:


public static void main(String[] args) {

    final MarketData marketData = new MarketData();

    final ChronicleQueue q = ChronicleQueue

            .single("market-data");

    final ExcerptAppender appender = q.acquireAppender();


    for (long i = 0; i < 1e9; i++) {

        try (final DocumentContext document =

                     appender.acquireWritingDocument(false)) {

             document

                    .wire()

                    .bytes()

                    .writeObject(MarketData.class, 

                            MarketDataUtil.recycle(marketData));


        }

    }

}



Using a MacBook Pro 2019 with a 2.3 GHz 8-Core Intel Core i9, north of 3,000,000 messages per second could be inserted using only a single thread. The queue is persisted via a memory-mapped file in the given directory “market-data”. One would expect a MarketData object to occupy 4 (int securityId) + 8 (long time) + 4*3 (float last, high and low) = 24 bytes at the very least.


In the example above, 1 billion objects were appended causing the mapped file to occupy 30,148,657,152 bytes which translates to about 30 bytes per message. In my opinion, this is very efficient indeed.


As can be seen, a single MarketData instance can be reused over and over again because Chronicle Queue will flatten out the content of the current object onto the memory mapped file, allowing object reuse. This reduces memory pressure even more. This is how the recycle method works:

static MarketData recycle(MarketData marketData) {

    final int id = ThreadLocalRandom.current().nextInt(1000);

    marketData.setSecurityId(id);

    final float nextFloat = ThreadLocalRandom.current().nextFloat();

    final float last = 20 + 100 * nextFloat;


    marketData.setLast(last);

    marketData.setHigh(last * 1.1f);

    marketData.setLow(last * 0.9f);

    marketData.setTime(System.currentTimeMillis());


    return marketData;

}

Reading from a Chronicle Queue

Reading from a Chronicle Queue is straightforward. Continuing the example from above, the following shows how the first two MarketData objects can be read from the queue:


public static void main(String[] args) {

    final ChronicleQueue q = ChronicleQueue

            .single("market-data");

    final ExcerptTailer tailer = q.createTailer();


    for (long i = 0; i < 2; i++) {

        try (final DocumentContext document =

                     tailer.readingDocument()) {

            MarketData marketData = document

                    .wire()

                    .bytes()

                    .readObject(MarketData.class);

            System.out.println(marketData);

        }

    }

}


This might produce the following output:


!software.chronicle.sandbox.queuedemo.MarketData {

  securityId: 202,

  time: 1634646488837,

  last: 45.8673,

  high: 50.454,

  low: 41.2806

}


!software.chronicle.sandbox.queuedemo.MarketData {

  securityId: 117,

  time: 1634646488842,

  last: 34.7567,

  high: 38.2323,

  low: 31.281

}


There are provisions to efficiently seek the tailer’s position, for example, to the end of the queue or to a certain index.

What’s Next?

There are many other features that are out of scope for this article. For example, queue files can be set to roll at certain intervals (such as each day, hour or minute) effectively creating a decomposition of information so that older data may be cleaned over time. There are also provisions to be able to isolate CPUs and lock Java threads to these isolated CPUs, substantially reducing application jitter.

Finally, there is an enterprise version with replication of queues across server clusters paving the way towards high availability and improved performance in distributed architectures. The enterprise version also includes a variety of other features such as encryption, time zone rolling and asynchronous appenders.

Resources

Chronicle homepage

Chronicle Queue on GitHub (open-source)

Chronicle Queue Enterprise information on GitHub

[Bloch18] Joshua Bloch, Effective Java, Third Edition, ISBN 0-13-468599-7, 2018 


Monday, November 1, 2021

Did You Know You Can Create Mappers Without Creating Underlying Objects in Java?

 As most Java developers know, putting values in a Java Map (like a HashMap) involves creating a large number of auxiliary objects under the covers. For example, a HashMap with int keys and long values might, for each entry, create a wrapped Integer, a wrapped Long object and a Node that holds the former values together with a hash value and a link to other potential  Node objects sharing the same hash bucket. Perhaps even more tantalizing is that a wrapped Integer might be created each time the Map is queried! For example, using the Map::get operation.


In this short tutorial, we will devise a way of creating an object-creation-free, light-weighted mapper with rudimentary lookup capability that is suitable for a limited number of associations. The mapper is first created and initialized, whereafter it can be queried. Interestingly, these mappers can also be serialized/deserialized and sent over the wire using Chronicle’s open-source libraries without incurring additional object creation.


Setting the Scene

Suppose we have a number of Security objects with an “id” field of type int. We would like to create a reusable mapper for these objects allowing a number of Security objects to be looked up using the “id” field:


public final class Security extends SelfDescribingMarshallable {


    private int id;

    private long averagePrice;

    private long count;


    public Security(int id, long price, long count) {

        this.id = id;

        this.averagePrice = price;

        this.count = count;

    }

    // Getters, setters and toString() not shown for brevity

}

The SelfDescribingMarshallable is basically a serialization marker.

Implementing an IntMapper

We can now store these Security objects in an IntMapper containing the actual lookup method as shown hereunder:


public class IntMapper<V> extends SelfDescribingMarshallable {


    private final List<V> values = new ArrayList<>();

    private final ToIntFunction<? super V> extractor;


    public IntMapper(final ToIntFunction<? super V> extractor) {

        this.extractor = Objects.requireNonNull(extractor);

    }


    public List<V> values() { return values; }


    public IntStream keys() {

        return values.stream().mapToInt(extractor);

    }


    public void set(Collection<? extends V> values) {

        this.values.clear();

        this.values.addAll(values);

        // Sort the list in id order

        this.values.sort(comparingInt(extractor));

    }


    public V get(int key) {

        int index = binarySearch(key);

        if (index >= 0)

            return values.get(index);

        else

            return null;

    }


    // binarySearch() shown later in the article

}


That’s it! We have created a reusable mapper with no object creation overhead with reasonable query performance.



Using the Mapper

Armed with the above classes, we can put together a small main method that demonstrates the use of the concept:


public class SecurityLookup {


    public static void main(String[] args) {


        // These can be reused

        final Security s0 = new Security(100, 45, 2);

        final Security s1 = new Security(10, 100, 42);

        final Security s2 = new Security(20, 200, 13);


        // This can be reused

        final List<Security> securities = new ArrayList<>();


        securities.add(s0);

        securities.add(s1);

        securities.add(s2);


        // Reusable Mapper

        IntMapper<Security> mapper =

                new IntMapper<>(Security::getId);


        mapper.set(securities);


        Security security100 = mapper.get(100);


        System.out.println("security100 = " + security100);

    }


}


As expected, the program will produce the following output when run:


security100 = Security{id=100, averagePrice=45, count=2}

Binary Search Method Implementation

The binary search method used above might be implemented like this:


    int binarySearch(final int key) {

        int low = 0;

        int high = values.size() - 1;


        while (low <= high) {

            final int mid = (low + high) >>> 1;

            final V midVal = values.get(mid);

            int cmp = Integer.compare(

                    extractor.applyAsInt(midVal), key);



            if (cmp < 0)

                low = mid + 1;

            else if (cmp > 0)

                high = mid - 1;

            else

                return mid;

        }

        return -(low + 1);

    }


}


Unfortunately, we cannot use Arrays::binarySearch or Collections::binarySearch. One reason is that methods like these would create additional objects upon querying.


Other Key Types

If we want to use other types like CharSequence or other reference objects, there is an overload of the comparing() method that takes a custom comparator. This might look like the following in the case of CharSequence:


values.sort(

    comparing(Security::getId, CharSequenceComparator.INSTANCE));


More generally, if the key reference object is of type K, then the binary search method above can easily be modified to use an extractor of type Function<? super T, ? extends K>  instead and an added Comparator<? super K> parameter.


A complete example of a generic Mapper<K, V> is available here.


Serializing Across the Wire

Sending IntMapper objects over the wire without object creation requires special care on the receiver side so that old Security objects can be reused. This involves setting up a transient buffer that holds recycled Security objects.


private final transient List<V> buffer = new ArrayList<>();


We also have to override the IntMapper::readMarshallable method and include:


wire.read("values").sequence(values, buffer, Security::new);


The complete setup is outside the scope of this article.

Analysis: HashMap vs. IntMapper

Looking at various properties of the two alternatives, we see the following:

Execution Performance

Operation

HashMap

IntMapper

put/add

O(1) < x < O(log(N)) (*)

O(1) (**)

sort

-

O(log(N))

get

O(1) < x < O(log(N)) (*)

O(log(N))


(*) Depending on key distribution, size, load factor and associations made.
(**) There is no add method in the IntMapper, instead all values are added in a batch


Memory usage in Bytes

Operation

HashMap

IntMapper

put/add

48N (***)

0 (***)

get

16N (***)

0


(***): The figures above are under typical JVM use, excluding the Security objects themselves and excluding any backing array, both of which can be recycled between use.


Object Allocation in objects

Operation

HashMap

IntMapper

put/add

2 * N

0

get

N

0


All the figures above are excluding the Security objects themselves and excluding any backing array.


Resources

Chronicle Software Home Page


Chronicle Wire on GitHub (open-source)


Complete source code for all examples in this article (open-source)