Minborg

Minborg
Minborg

Monday, May 9, 2022

Which JVM Version is Fastest?

Which JVM Version is Fastest?

How is a high-performance, low-latency Java application affected by the JVM version used? Every nanosecond counts for trading and other applications where messages between two different threads are exchanged in about 250 ns! Read this article and find out which JDK variant comes out at the top!

Benchmarks

This article will use open-source Chronicle Queue to exchange 256-byte messages between two threads whereby all messages are also stored in shared memory (/dev/shm is used to minimize the impact of the disk subsystem). 

Chronicle Queue is a persisted low-latency Java messaging framework for high-performance and critical applications. Because Chronicle Queue is operating on mapped native memory, it eliminates the need for garbage collections giving developers deterministic high performance.

In the benchmarks, a single producer thread writes messages to a queue with a nanosecond timestamp. Another consumer thread reads the messages from the queue and records the time deltas in a histogram. The producer maintains a sustained message output rate of 100,000 messages per second with a 256-byte payload in each message. Data is measured over 100 seconds so that most jitter will be reflected in the measurements and ensures a reasonable confidence interval for the higher percentiles.

The target machine has an AMD Ryzen 9 5950X 16-Core Processor running at 3.4 GHz under Linux 5.11.0-49-generic #55-Ubuntu SMP. The CPU cores 2-8 are isolated, meaning the operating system will not automatically schedule any user processes and will avoid most interrupts on these cores.

The Java Code

Below, parts of the inner loop of the producer is shown:


// Pin the producer thread to CPU 2

Affinity.setAffinity(2);

try (ChronicleQueue cq = SingleChronicleQueueBuilder.binary(tmp)         .blockSize(blocksize)         .rollCycle(ROLL_CYCLE)         .build()) {

    ExcerptAppender appender = cq.acquireAppender();

    final long nano_delay = 1_000_000_000L/MSGS_PER_SECOND;

    for (int i = -WARMUP; i < COUNT; ++i) {

        long startTime = System.nanoTime();

        try (DocumentContext dc = appender.writingDocument()) {

            Bytes bytes = dc.wire().bytes();

            data.writeLong(0, startTime);

            bytes.write(data,0, MSGSIZE);

        }

        long delay = nano_delay - (System.nanoTime() - startTime);

        spin_wait(delay);

    }

}


In another thread, the consumer thread is running this code in its inner loop (shortened code):

// Pin the consumer thread to CPU 4

Affinity.setAffinity(4);

try (ChronicleQueue cq = SingleChronicleQueueBuilder.binary(tmp)         .blockSize(blocksize)         .rollCycle(ROLL_CYCLE)         .build()) {

    ExcerptTailer tailer = cq.createTailer();

    int idx = -APPENDERS * WARMUP;

    while(idx < APPENDERS * COUNT) {

        try (DocumentContext dc = tailer.readingDocument()) {

            if(!dc.isPresent())

                continue;

            Bytes bytes = dc.wire().bytes();

            data.clear();

            bytes.read(data, (int)MSGSIZE);

            long startTime = data.readLong(0);

            if(idx >= 0)

                deltas[idx] = System.nanoTime() - startTime;

            ++idx;

        }

    }

}


As can be seen, the consumer thread will read each nano timestamp and record the corresponding latency in an array. These timestamps are later put in a histogram which is printed when the benchmark completes. Measurements will start only after the JVM has warmed up properly and the C2 compiler has JIT:ed the hot execution path.

JVM Variants

Chronicle Queue officially supports all the recent LTS versions: Java 8, Java 11, and Java 17, and so these will be used in the benchmarks. We will also use the GraalVM community and enterprise edition. Here is a list of the specific JVM variants used:


Legend (JVM Variant)

Detail

OpenJDK 8

1.8.0_322, vendor: Temurin

OpenJDK 11

11.0.14.1, vendor: Eclipse Adoptium

OpenJDK 17

17.0.2, vendor: Eclipse Adoptium

Graal VM CE 17

17.0.2, vendor: GraalVM Community

Graal VM EE 17

17.0.2, vendor: Oracle Corporation

Table 1, Shows the specific JVM variants used.

Measurements

As 100,000 messages per second are produced, and the benchmarks run for 100 seconds, there will be 100,000 * 100 = 10 million messages sampled during each benchmark. The histogram used places each sample in a certain percentile: 50% (median), 90%, 99%, 99.9% etc. Here is a table showing the total number of messages received for some percentiles:


Percentile

# Messages

0% (all)

10,000,000

50% (“Median”, used below)

5,000,000

99%

100,000

99.9%

10,000

99.99% (used below)

1,000

99.999%

100

Table 2, Shows the number of messages for each percentile.


Assuming a relatively small variance of the measurement values, the confidence interval is likely reasonable for percentiles up to 99.99%. The percentile 99.999% probably requires gathering data for at least half an hour or so rather than just 100 seconds to produce any figures with a reasonable confidence interval.

Benchmarks Results

For each Java variant, the benchmarks are run like this:

mvn exec:java@QueuePerformance

Remember that our producer and consumer threads will be locked down to run on the isolated CPU cores 2 and 4, respectively. 

Here is what a typical process looks like after it has run for a while:

$ top

    PID USER      PR  NI    VIRT    RES    SHR S  %CPU  %MEM     TIME+ COMMAND                                                    

3216555 per.min+  20   0   92.3g   1.5g   1.1g S 200.0   2.3   0:50.15 java 


As can be seen, the producer and consumer thread spin-waits between each message and therefore consumes an entire CPU core each. If CPU consumption is a concern, latency and determinism can be traded against lowered power consumption by parking threads for a short period (e.g. LockSupport.parkNanos(1000)) when no messages are available.

The figures below are given in nanoseconds (ns) which is essential to understand. 

Many other latency measurements are made in microseconds (= 1,000 ns) or even milliseconds (= 1,000,000 ns). One ns corresponds roughly to the access time of a CPU L1 cache.

Here is the result of the benchmarks where all values are given in ns:


JDK Variant

Median

99.99%

OpenJDK 8

280

3,951

OpenJDK 11

370

4,210

OpenJDK 17

290

4,041

GraalVM CE 17 (*)

310

3,950

GraalVM EE 17 (*)

270

3,800

Table 3, Shows the latency figures for the various JDKs used.

(*) Not officially supported by Chronicle Queue.


Typical Latency (median)

For the typical (median) values, there is no significant difference between the various JDKs except for OpenJDK 11 which is about 30% slower than the other versions. 

The fastest of them all is GraalVM EE 17, but the difference compared to OpenJDK 8/OpenJDK 17 is marginal.

Here is a graph with the typical 256-byte message latency for the various JDK variants used (lower is better):

Graph 1, Shows the median (typical) latency in ns for the various JDK variants.

The typical (median) latency varied slightly from run to run where the figures varied around 5%.

Higher Percentiles

Looking at the higher percentiles, there is not much difference between the supported JDK variants either. GraalVM EE is slightly faster again but here the relative difference is even smaller. OpenJDK 11 appears to be marginally worse (- 5%) than the other variants, but the delta is comparable within the estimated margin of error.

Here is another graph showing latencies for the 99.99% percentile for the various JDK variants (lower is better):

Graph 2, Shows the 99.99% percentile latency [ns] for the various JDK variants.

Conclusions

In my opinion, the latency figures of Chronicle Queue are excellent. Accessing 64-bit data from main memory takes about 100 cycles (which corresponds to about 30 ns on current hardware). The code above has some logic that has to be executed. Additionally, Chronicle Queue obtains data from the producer, persists data (writes to a memory-mapped file), applies appropriate memory fencing for inter-thread communication and happens-before guarantees, and then makes data available to the consumer. All this typically happens around 600 ns for 256 bytes compared to the single 64-bit memory access at 30 ns. Very impressive indeed.

OpenJDK 17 and GraalVM EE 17 seem to be the best choices for this application, providing the best latency figures. Consider using GraalVM EE 17 over OpenJDK 17 if outliers need to be suppressed or if you really need the lowest possible overall latency.

Resources

Open-source Chronicle Queue 

Open-source JDK


Tuesday, March 29, 2022

How to Reduce Cloud Cost by 99% for EDA Kafka Applications

 

How to Reduce Cloud Cost by 99% for EDA Kafka Applications

While the cloud offers great convenience and flexibility, the operational cost for applications deployed in the cloud can sometimes be significant. This article shows a way to substantially reduce operating costs in latency-sensitive Event-Driven Architecture (EDA) Java applications by migrating from Kafka to Chronicle Queue open-source, a more resource-efficient and lower-latency queue implementation.

What is EDA?

An EDA application is a distributed application where events (in the form of messages or DTOs) are produced, detected, consumed, and reacted to. Distributed means it might run on different machines or the same machine but in separate processes or threads. The latter concept is used in this article whereby messages are persisted to queues.

Setting the Scene

Suppose that we have an EDA application with a chain of five services and where we have a requirement that 99.9% of the messages sent from the first producer to the last consumer should have a latency less than 100 ms at a message rate of 1,000 messages per second.



Figure 1, The five Services and the Benchmark are interconnected by six topics/queues.

In other words, the time it takes from sending a message (ie using topic 0) by the Benchmark thread to when a resulting message is received by the Benchmark thread again (ie via topic 5) is only allowed to be higher than 100 ms for on average one messages out of every 1,000 messages which are sent every second.

The messages used in this article are simple. They contain a long nanosecond timestamp holding the initial timestamp when a message is first posted via topic 0 and an int value that is increased by one each time the message is propagated from one service to the next (this value is not actually used but illustrates a rudimentary service logic). When a message arrives back at the Benchmark thread, the current nanotime is compared with the original nanotime in the initial message sent on topic 0 to allow the calculation of the total latency across the entire service chain. The latency samples are then subsequently fed into a histogram for later analysis.


As can be seen in Figure 1 above, the number of topics/queues is equal to the number of services plus one. Hence, there are six topics/queues because there are five services.


The Question

The question in this article is: How many instances of these chains can we set up on a given hardware and still meet the latency requirement? Or, to rephrase it, how many of these applications can we run and still pay the same price for the hardware used?

Default Setup

In this article, I have elected to use Apache Kafka because it is one of the most common queue types used in the market. I have also selected Chronicle Queue due to its ability to provide low latency and resource efficiency. 


Both Kafka and Chronicle Queue have several configurable options, including replicating data across several servers. In this article, a single non-replicated queue will be used. For performance reasons, the Kafka broker will be run on the same machine as the services, allowing the use of the local loopback network interface.


The KafkaProducer instances are configured to be optimised for low latency (eg setting “acks=1”), and so are the KafkaConsumer instances.


The Chronicle Queue instances are created using the default setup with no explicit optimisation. Hence, the more advanced performance features in Chronicle Queue like CPU-core pinning and busy spin-waiting are not used.

Kafka

Apache Kafka is an open-source distributed event streaming platform for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications used extensively in various EDA applications, especially when several information sources residing in different locations are to be aggregated and consumed. 


In this benchmark, each test instance will create six distinct Kafka topics, and they are named topicXXXX0, topicXXXX1, … , topicXXXX5 where XXXXX is a random number.

Chronicle Queue

Open-source Chronicle Queue is a persisted low-latency messaging framework for high-performance and critical applications. Interestingly, Chronicle Queue uses off-heap memory and memory-mapping to reduce memory pressure and garbage collection impacts, making the product popular within the fintech area where deterministic low latency messaging is crucial.


In this other benchmark, each test instance will create six Chronicle Queue instances, named topicXXXX0, topicXXXX1, … , topicXXXX5 where XXXXX is a random number.

Code

The inner loops for the two different service thread implementations are shown below. They both poll their input queue until being ordered to shut down and, if there are no messages, they will wait for one-eighth of the expected inter-message time before a new attempt is made.


Here is the code:

Kafka


while (!shutDown.get()) {

    ConsumerRecords<Integer, Long> records = 

            inQ.poll(Duration.ofNanos(INTER_MESSAGE_TIME_NS / 8));

    for (ConsumerRecord<Integer, Long> record : records) {

        long beginTimeNs = record.value();

        int value = record.key();

        outQ.send(new ProducerRecord<>(topic, value + 1, beginTimeNs));

    }

}


Using the record key() to carry an int value might be a bit unorthodox but allows us to improve performance and simplify the code.


Chronicle Queue


while (!shutDown.get()) {

    try (final DocumentContext rdc = tailer.readingDocument()) {

        if (rdc.isPresent()) {

            ValueIn valueIn = rdc.wire().getValueIn();

            long beginTime = valueIn.readLong();

            int value = valueIn.readInt();

            try (final DocumentContext wdc =

                         appender.writingDocument()) {

                final ValueOut valueOut = wdc.wire().getValueOut();

                valueOut.writeLong(beginTime);

                valueOut.writeInt(value + 1);

            }

        } else {

            LockSupport.parkNanos(INTER_MESSAGE_TIME_NS / 8);

        }

    }

}


Benchmarks

The benchmarks had an initial warmup phase during which the JVM’s C2 compiler profiled and compiled code for much better performance. The sampling results from the warmup period were discarded.


More and more test instances were started manually (each with its own five services) until the latency requirements could not be fulfilled anymore. Whilst running the benchmarks, the CPU utilisation was also observed for all instances using the “top” command and averaged over a few seconds.


The benchmarks did not take coordinated omission into account and were run on  Ubuntu Linux (5.11.0-49-generic) with AMD Ryzen 9 5950X 16-Core Processors at 3.4 GHz with 64 GB RAM where the applications were run on the isolated cores 2-8 (7 CPU cores in total) and queues were persisted to a 1 TB NVMe flash device . OpenJDK 11 (11.0.14.1) was used.


All latency figures are given in ms, 99% means 99-percentile and 99.9% means 99.9-percentile.

Kafka

The Kafka broker and the benchmarks were all run using the prefix “taskset -c 2-8” followed by the respective command (eg taskset -c 2-8 mvn exec:java@Kafka). The following results were obtained for Kafka:



Instances

Median Latency 

99%

99.9%

CPU Utilisation

1

0.9

19

30

670%

2

16

72

106 (*) 

700% (saturated)


Table 1, Shows Kafka instances vs latencies and CPU utilisation.


(*) Over 100 ms on the 99.9-percentile.


As can be seen, only one instance of the EDA system could be run simultaneously. Running two instances increased the 99.9-percentile, so it exceeded the limit of 100 ms. The instances and the Kafka broker quickly saturated the available CPU resources.

Here is a snapshot from the output from the “top” command when running two instance and a broker (pid 3132946):

3134979 per.min+  20   0   20.5g   1.6g  20508 S 319.6   2.6  60:27.40 java                                                                            

3142126 per.min+  20   0   20.5g   1.6g  20300 S 296.3   2.5  19:36.17 java                                                                            

3132946 per.min+  20   0   11.3g   1.0g  22056 S  73.8   1.6   9:22.42 java


Chronicle Queue

The benchmarks were run using the command “taskset -c 2-8 mvn exec:java@ChronicleQueue” and following results were obtained:



Instances

Median Latency 

99%

99.9%

CPU Utilisation

1

0.5

0.8

0.9

5.2%

10

0.5

0.9

0.9

79%

25

0.5

0.9

3.6

180%

50

0.5

0.9

5.0

425%

100

1.0

5

20

700% (saturated)

150

2.0

7

53

700% (saturated)

200

3.1

9

59

700% (saturated)

250

4.8

12

62

700% (saturated)

375

8.7

23

75

700% (saturated)

500

11

36

96

700% (saturated)


Table 2, Shows Chronicle Queue instances vs latencies and CPU utilisation.


The sheer efficiency of Chronicle Queue becomes apparent in these benchmarks when 500 instances can be run at the same time meaning we handle 3,000 simultaneous queues and 3,000,000 messages per second on just 7 cores at less than 100 ms delay at the 99.9-percentile.

Comparison

Here is a chart showing the number of instances vs the 99.9-percentile for the two different queue types:


Chart 1, Shows Instances vs latencies in ms for the 99.9-percentile.

As can be seen, the curve for Kafka goes from 30 ms to 106 ms in just one step so the latency growth for Kafka looks like a wall in this scale.

Conclusion
About four hundred times more applications can be run on the same hardware if a switch is made from Kafka to Chronicle Queue for specific latency-sensitive EDA applications. 

About four hundred times more applications corresponds to a potential of reducing cloud or hardware costs by about 99.8% as illustrated in Char 2 below (less is better). In fact, the cost can barely be seen at all in the scale used:


Chart 2, Shows normalised cost vs queue type (less is better)

Resources

Open-source Apache Kafka

Open-source Chronicle Queue