Monday, April 15, 2019

Java Stream: Part 2, Is a Count Always a Count?

Java Stream: Part 2, Is a Count Always a Count?

In my previous article on the subject, we learned that JDK 8’s stream()::count takes longer time to execute the more elements there are in the Stream. For more recent JDKs, such as Java 11, that is no longer the case for simple stream pipelines. Learn how things have gotten improved within the JDK itself and how to do if you have more complex stream pipelines.

Java 8

In my previous article, we could conclude that the operation list.stream().count() is O(N) under Java 8, i.e. the execution time depends on the number of elements in the original list. Read the article here.

Java 9 and Upwards

As rightfully pointed out by Nikolai Parlog (@nipafx) and Brian Goetz (@BrianGoetz) on Twitter, the implementation of Stream::count was improved beginning from Java 9. Here is a comparison of the underlying Stream::count code between Java 8 and later Java versions:

Java 8 (from the ReferencePipeline class)

return mapToLong(e -> 1L).sum();

Java 9 and later (from the ReduceOps class)

if (StreamOpFlag.SIZED.isKnown(flags)) {
    return spliterator.getExactSizeIfKnown();
}
...

It appears Stream::count in Java 9 and later is O(1) for Spliterators of known size rather than being O(N). Let’s verify that hypothesis.

Benchmarks

The big-O property can be observed by running the following JMH benchmarks under Java 8 and Java 11:

@State(Scope.Benchmark)
public class CountBenchmark {

    private List<Integer> list;

    @Param({"1", "1000", "1000000"})
    private int size;

    @Setup
    public void setup() {
        list = IntStream.range(0, size)
            .boxed()
            .collect(toList());
    }

    @Benchmark
    public long listSize() {
        return list.size();
    }

    @Benchmark
    public long listStreamCount() {
        return list.stream().count();
    }

    public static void main(String[] args) throws RunnerException {
        Options opt = new OptionsBuilder()
            .include(CountBenchmark.class.getSimpleName())
            .mode(Mode.Throughput)
            .threads(Threads.MAX)
            .forks(1)
            .warmupIterations(5)
            .measurementIterations(5)
            .build();

        new Runner(opt).run();

    }

}

This will produce the following outputs on my laptop (MacBook Pro mid 2015, 2.2 GHz Intel Core i7):

JDK 8 (from my previous article)

Benchmark                        (size)   Mode  Cnt          Score           Error  Units
CountBenchmark.listSize               1  thrpt    5  966658591.905 ± 175787129.100  ops/s
CountBenchmark.listSize            1000  thrpt    5  862173760.015 ± 293958267.033  ops/s
CountBenchmark.listSize         1000000  thrpt    5  879607621.737 ± 107212069.065  ops/s
CountBenchmark.listStreamCount        1  thrpt    5   39570790.720 ±   3590270.059  ops/s
CountBenchmark.listStreamCount     1000  thrpt    5   30383397.354 ±  10194137.917  ops/s
CountBenchmark.listStreamCount  1000000  thrpt    5        398.959 ±       170.737  ops/s

JDK 11

Benchmark                                  (size)   Mode  Cnt          Score           Error  Units
CountBenchmark.listSize                         1  thrpt    5  898916944.365 ± 235047181.830  ops/s
CountBenchmark.listSize                      1000  thrpt    5  865080967.750 ± 203793349.257  ops/s
CountBenchmark.listSize                   1000000  thrpt    5  935820818.641 ±  95756219.869  ops/s
CountBenchmark.listStreamCount                  1  thrpt    5   95660206.302 ±  27337762.894  ops/s
CountBenchmark.listStreamCount               1000  thrpt    5   78899026.467 ±  26299885.209  ops/s
CountBenchmark.listStreamCount            1000000  thrpt    5   83223688.534 ±  16119403.504  ops/s

As can be seen, in Java 11, the list.stream().count() operation is now O(1) and not O(N).

Brian Goetz pointed out that some developers, who were using Stream::peek method calls under Java 8, discovered that these methods were no longer invoked if the Stream::count terminal operation was run under Java 9 and onwards. This generated some negative feedback to the JDK developers. Personally, I think it was the right decision by the JDK developers and that this instead presented a great opportunity for Stream::peek users to get their code right.

More Complex Stream Pipelines

In this chapter, we will take a look at more complex stream pipelines.

JDK 11

Tagir Valeev concluded that pipelines like stream().skip(1).count() are not O(1) for List::stream.
This can be observed by running the following benchmark:
@Benchmark
public long listStreamSkipCount() {
    return list.stream().skip(1).count();
}
CountBenchmark.listStreamCount                  1  thrpt    5  105546649.075 ±  10529832.319  ops/s
CountBenchmark.listStreamCount               1000  thrpt    5   81370237.291 ±  15566491.838  ops/s
CountBenchmark.listStreamCount            1000000  thrpt    5   75929699.395 ±  14784433.428  ops/s
CountBenchmark.listStreamSkipCount              1  thrpt    5   35809816.451 ±  12055461.025  ops/s
CountBenchmark.listStreamSkipCount           1000  thrpt    5    3098848.946 ±    339437.339  ops/s
CountBenchmark.listStreamSkipCount        1000000  thrpt    5       3646.513 ±       254.442  ops/s

Thus, list.stream().skip(1).count() is still O(N).

Speedment

Some stream implementations are actually aware of their sources and can take appropriate shortcuts and merge stream operations into the stream source itself. This can improve performance massively, especially for large streams with more complex stream pipelines like stream().skip(1).count()

The Speedment ORM tool allows databases to be viewed as Stream objects and these streams can optimize away many stream operations like the Stream::count, Stream::skip, Stream::limit operation as demonstrated in the benchmark below. I have used the open-source Sakila exemplary database as data input. The Sakila database is all about rental films, artists etc.
@Benchmark
public long rentalsSkipCount() {
    return rentals.stream().skip(1).count();
}

@Benchmark
public long filmsSkipCount() {
    return films.stream().skip(1).count();
}
When run, the following output will be produced:

SpeedmentCountBenchmark.filmsSkipCount        N/A  thrpt    5   68052838.621 ±    739171.008  ops/s
SpeedmentCountBenchmark.rentalsSkipCount      N/A  thrpt    5   68224985.736 ±   2683811.510  ops/s
The “rental” table contains over 10,000 rows whereas the “film” table only contains 1,000 rows. Nevertheless, their stream().skip(1).count() operations complete in almost the same time. Even if a table would contain a trillion rows, it would still count the elements in the same elapsed time. Thus, the stream().skip(1).count() implementation has a complexity that is O(1) and not O(N).

Note: The benchmark above were run with “DataStore” in-JVM-memory acceleration. If run with no acceleration directly against a database, the response time would depend on the underlying database’s ability to execute a nested “SELECT count(*) …” statement.

Summary

Stream::count was significantly improved in Java 9.

There are stream implementations, such as Speedment, that are able to compute Stream::count in O(1) time even for more complex stream pipelines like stream().skip(...).count() or even stream.filter(...).skip(...).count().

Resources

Speedment Stream ORM Initializer: https://www.speedment.com/initializer/

Sakila: https://dev.mysql.com/doc/index-other.html or https://hub.docker.com/r/restsql/mysql-sakila

Monday, April 8, 2019

Java Stream: Is a Count Always a Count?

Java Stream: Is a Count Always a Count?

It might appear obvious that counting the elements in a Stream takes longer time the more elements there are in the Stream. But actually, Stream::count can sometimes be done in a single operation, no matter how many elements you have. Read this article and learn how.

Count Complexity

The Stream::count terminal operation counts the number of elements in a Stream. The complexity of the operation is often O(N), meaning that the number of sub-operations is proportional to the number of elements in the Stream.

In contrast, the List::size method has a complexity of O(1) which means that regardless of the number of elements in the List, the size() method will return in constant time. This can be observed by running the following JMH benchmarks:
@State(Scope.Benchmark)
public class CountBenchmark {

    private List<Integer> list;

    @Param({"1", "1000", "1000000"})
    private int size;

    @Setup
    public void setup() {
        list = IntStream.range(0, size)
            .boxed()
            .collect(toList());
    }

    @Benchmark
    public long listSize() {
        return list.size();
    }

    @Benchmark
    public long listStreamCount() {
        return list.stream().count();
    }

    public static void main(String[] args) throws RunnerException {
        Options opt = new OptionsBuilder()
            .include(CountBenchmark.class.getSimpleName())
            .mode(Mode.Throughput)
            .threads(Threads.MAX)
            .forks(1)
            .warmupIterations(5)
            .measurementIterations(5)
            .build();

        new Runner(opt).run();

    }

}

This produced the following output on my laptop (MacBook Pro mid 2015, 2.2 GHz Intel Core i7):

Benchmark                        (size)   Mode  Cnt          Score           Error  Units
CountBenchmark.listSize               1  thrpt    5  966658591.905 ± 175787129.100  ops/s
CountBenchmark.listSize            1000  thrpt    5  862173760.015 ± 293958267.033  ops/s
CountBenchmark.listSize         1000000  thrpt    5  879607621.737 ± 107212069.065  ops/s
CountBenchmark.listStreamCount        1  thrpt    5   39570790.720 ±   3590270.059  ops/s
CountBenchmark.listStreamCount     1000  thrpt    5   30383397.354 ±  10194137.917  ops/s
CountBenchmark.listStreamCount  1000000  thrpt    5        398.959 ±       170.737  ops/s


As can be seen, the throughput of List::size is largely independent of the number of elements in the List whereas the throughput of Stream::count drops of rapidly as the numbers of elements grow. But, is this really always the case for all Stream implementation per se?

Source Aware Streams

Some stream implementations are actually aware of their sources and can take appropriate shortcuts and merge stream operations into the stream source itself. This can improve performance massively, especially for large streams. The Speedment ORM tool allows databases to be viewed as Stream objects and these streams can optimize away many stream operations like the Stream::count operation as demonstrated in the benchmark below. I have used the open-source Sakila exemplary database as data input. The Sakila database is all about rental films, artists etc.

@State(Scope.Benchmark)
public class SpeedmentCountBenchmark {

    private Speedment app;
    private RentalManager rentals;
    private FilmManager films;

    @Setup
    public void setup() {
        app =  new SakilaApplicationBuilder()
            .withBundle(DataStoreBundle.class)
            .withLogging(ApplicationBuilder.LogType.STREAM)
            .withPassword(ExampleUtil.DEFAULT_PASSWORD)
            .build();

        app.get(DataStoreComponent.class).ifPresent(DataStoreComponent::load);

        rentals = app.getOrThrow(RentalManager.class);
        films = app.getOrThrow(FilmManager.class);

    }

    @TearDown
    public void tearDown() {
        app.close();
    }


    @Benchmark
    public long rentalsCount() {
        return rentals.stream().count();
    }


    @Benchmark
    public long filmsCount() {
        return films.stream().count();
    }


    public static void main(String[] args) throws RunnerException {
        Options opt = new OptionsBuilder()
            .include(SpeedmentCountBenchmark.class.getSimpleName())
            .mode(Mode.Throughput)
            .threads(Threads.MAX)
            .forks(1)
            .warmupIterations(5)
            .measurementIterations(5)
            .build();

        new Runner(opt).run();

    }

}
When run, the following output will be produced:

Benchmark                              Mode  Cnt         Score          Error  Units
SpeedmentCountBenchmark.filmsCount    thrpt    5  71037544.648 ± 75915974.254  ops/s
SpeedmentCountBenchmark.rentalsCount  thrpt    5  69750012.675 ± 37961414.355  ops/s


The “rental” table contains over 10,000 rows whereas the “film” table only contains 1,000 rows. Nevertheless, their Stream::count operations complete in almost the same time. Even if a table would contain a trillion rows, it would still count the elements in the same elapsed time. Thus, the Stream::count implementation has a complexity that is O(1) and not O(N).

Note: The benchmark above were run with Speedment's “DataStore” in-JVM-memory acceleration. If run with no acceleration directly against a database, the response time would depend on the underlying database’s ability to execute a “SELECT count(*) FROM film” query.

Summary

It is possible to create Stream implementation that counts their elements in a single operation rather than counting each and every element in the stream. This can improve performance significantly, especially for streams with many elements.

Resources

Speedment Stream ORM Initializer: https://www.speedment.com/initializer/
Sakila: https://dev.mysql.com/doc/index-other.html or https://hub.docker.com/r/restsql/mysql-sakila