Friday, December 5, 2014

Java 8, Implementing a ConcurrentHashSet

Background

In the package java.util.concurrent there are numerous classes that enables concurrent access to various objects and data structures. However, there is a lack of a concurrent Set in the standard libraries. In this post I will show how to fix this problem.

The easy way out

There is a very simple way of getting a kind of concurrent Set with elements of type K in Java:
Map<K, Boolean> concurrentMap = new ConcurrentHashMap();
Set<K> concurrentSet = Collections.newSetFromMap(concurrentMap);

For example, one can get a concurrent Set of strings like this
Set<String> concurrentSet = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());

The concurrentSet will exhibit the same concurrency properties as the underlying ConcurrentMap does (i.e. it will provide concurrent lock free access in this case) which is good. It also inherits all the other Map properties such as null key capability etc.

Of course you can provide any map to the newSetFromMap() including the non-concurrent HashMap (keys are in any order), LinkedHashMap  (preserves insertion order) or TreeMap (keeps the keys in their natural order). The resulting Set will inherit those underlying properties. For example, if you provide LinkedHashMap, you will get a non-concurrent Set where the keys will be retrieve in insertion order.

Limitations and Drawbacks

When you are using the Collections.newSetFromMap() method, the Map you provide must be empty from the beginning. You must also take care not to keep an additional reference to the backing map, because newSetFromMap() does not perform any defensive copy of the map. If you keep such a reference, you can alter the Set using the Map reference too, which is unsafe. The value type Boolean used in the underlying Map also strikes me as odd. Why was that particular type selected one might ask oneself? Object would be more general, one might argue.

The most important drawback in my opinion, is that the returned Set really is just a Set that just happens to be concurrent (again, if you provide a ConcurrentMap). For example, it does not implement the method that corresponds to the features ConcurrentMap brings over Map, like putIfAbsent() and numerous other new Java 8 methods like computeIfAbsent(). There is no way to really guarantee that the Set is concurrent.

The Interface Proposal

So what do we want to accomplish here? Well, wouldn't it be nice if we could define an interface ConcurrentSet and use it just like we are using the interface ConcurrentMap. Let us take a look at the latter interface (as defined in Java 8):

public interface ConcurrentMap<K, V> extends Map<K, V> {

    V getOrDefault(Object key, V defaultValue);

    void forEach(BiConsumer<? super K, ? super V> action);

    V putIfAbsent(K key, V value);

    boolean remove(Object key, Object value);

    boolean replace(K key, V oldValue, V newValue);

    void replaceAll(BiFunction<? super K, ? super V, ? extends V> function)

    V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction);

    V computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction);

    V compute(K key,BiFunction<? super K, ? super V, ? extends V> remappingFunction);

    V merge(K key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction);

}

As can be seen, most of the methods are dealing with values rather than keys. Thus, most of the methods can not be applied to a ConcurrentSet. So, we can discard most methods and perhaps redefine others so that they only deal with keys. First I thought that we should skip all methods except the putIfAbsent() method, but I was wrong since it is equivalent to add() (thanks for pointing that out Louis Wasserman). So, we drop all methods and end up with just a Marker Interface with no extra methods over Set. Still, it is useful because we can show our intent with the new interface.

public interface ConcurrentSet<E> extends Set<E> {

}

Additional concurrent methods can be added later if desired. Please leave a comment below if you can think of new methods that would be nice to add to a ConcurrentSet.

What's Alredy There?

If we take a closer look at the Collections.newSetFromMap() method, we see that it basically delegates the methods from a backing map (and the backing map's keySet). Below, I have shown a shortened version of the SetFromMap class, so you will get the basic idea. The same concept is also used by Java's ConcurrentSkipListSet, so this certainly seams to be common practice.

 private static class SetFromMap<E> extends AbstractSet<E>
        implements Set<E>, Serializable
    {
        private final Map<E, Boolean> m;  // The backing map
        private transient Set<E> s;       // Its keySet

        SetFromMap(Map<E, Boolean> map) {
            if (!map.isEmpty())
                throw new IllegalArgumentException("Map is non-empty");
            m = map;
            s = map.keySet();
        }

        public void clear()               {        m.clear(); }
        public int size()                 { return m.size(); }
        public boolean isEmpty()          { return m.isEmpty(); }
        public boolean contains(Object o) { return m.containsKey(o); }
        public boolean remove(Object o)   { return m.remove(o) != null; }
        public boolean add(E e) { return m.put(e, Boolean.TRUE) == null; }
        public Iterator<E> iterator()     { return s.iterator(); }
        public Object[] toArray()         { return s.toArray(); }
        public <T> T[] toArray(T[] a)     { return s.toArray(a); }
        public String toString()          { return s.toString(); }
        public int hashCode()             { return s.hashCode(); }
        public boolean equals(Object o)   { return o == this || s.equals(o); }
        public boolean containsAll(Collection<?> c) {return s.containsAll(c);}
        public boolean removeAll(Collection<?> c)   {return s.removeAll(c);}
        public boolean retainAll(Collection<?> c)   {return s.retainAll(c);

       // The rest comes here...
}

An Implementation Proposal

Using the Delegation Pattern, we can easily come up with a similar implementation of the new ConcurrentSet interface as depicted here under:

/**
 * A hash set supporting full concurrency of retrievals and
 * high expected concurrency for updates.
 *
 * @param <E> the type of elements maintained by this set
 * @author pemi
 *
public class ConcurrentHashSet<E> implements ConcurrentSet<E>, Serializable {

    private final ConcurrentMap<E, Object> m;
    private transient Set<E> s;

    public ConcurrentHashSet() {
        this.m = new ConcurrentHashMap<>();
        init();
    }

    public ConcurrentHashSet(int initialCapacity) {
        this.m = new ConcurrentHashMap<>(initialCapacity);
        init();
    }

    public ConcurrentHashSet(int initialCapacity, float loadFactor) {
        this.m = new ConcurrentHashMap<>(initialCapacity, loadFactor);
        init();
    }

    public ConcurrentHashSet(int initialCapacity, float loadFactor, int concurrencyLevel) {
        this.m = new ConcurrentHashMap<>(initialCapacity, loadFactor, concurrencyLevel);
        init();
    }

    public ConcurrentHashSet(Set<? extends E> s) {
        this(Math.max(Objects.requireNonNull(s).size(), 16));
        addAll(s);
    }

    // New type of constructor
    public ConcurrentHashSet(Supplier<? extends ConcurrentMap<E, Object>> concurrentMapSupplier) {
        final ConcurrentMap<E, Object> newMap = concurrentMapSupplier.get();
        if (!(newMap instanceof ConcurrentMap)) {
            throw new IllegalArgumentException("The supplied map does not implement "+ConcurrentMap.class.getSimpleName());
        }
        this.m = newMap;
        init();
    }
   
    private void init() {
        this.s = m.keySet();
    }
   
    @Override public void clear()               {        m.clear(); }
    @Override public int size()                 { return m.size(); }
    @Override public boolean isEmpty()          { return m.isEmpty(); }
    @Override public boolean contains(Object o) { return m.containsKey(o); }
    @Override public boolean remove(Object o)   { return m.remove(o) != null; }
    @Override public boolean add(E e)           { return m.put(e, Boolean.TRUE) == null; }
    @Override public Iterator<E> iterator()     { return s.iterator(); }
    @Override public Object[] toArray()         { return s.toArray(); }
    @Override public <T> T[] toArray(T[] a)     { return s.toArray(a); }
    @Override public String toString()          { return s.toString(); }
    @Override public int hashCode()             { return s.hashCode(); }
    @Override public boolean equals(Object o)   { return s.equals(o); }
    @Override public boolean containsAll(Collection<?> c) {return s.containsAll(c);}
    @Override public boolean removeAll(Collection<?> c)   {return s.removeAll(c);}
    @Override public boolean retainAll(Collection<?> c)   {return s.retainAll(c);}

    @Override
    public boolean addAll(Collection<? extends E> c) {
        // Use Java 8 Stream 
        return Objects.requireNonNull(c).stream().map((e) -> add(e)).filter((b)->b).count() > 0;
    }


    // Override default methods in Collection
    @Override public void forEach(Consumer<? super E> action) { s.forEach(action);}
    @Override public boolean removeIf(Predicate<? super E> filter) { return s.removeIf(filter);}
    @Override public Spliterator<E> spliterator()     {return s.spliterator();}
    @Override public Stream<E> stream()               {return s.stream();}
    @Override public Stream<E> parallelStream()       {return s.parallelStream();}

    private static final long serialVersionUID = -913526372691027123L;

    private void readObject(java.io.ObjectInputStream stream)
       throws IOException, ClassNotFoundException
    {
        stream.defaultReadObject();
        init();
    }

}
Note the new constructor ConcurrentHashSet(Supplier<ConcurrentMap<E, Object>> concurrentMapSupplier) that allows us to provide any ConcurrentMap as the underlying map at creation time. By providing a Supplier rather than a concrete Map instance, we avoid the double reference problem and the "map must be empty" problem associated with the Collections.newSetFromMap() method. For example, we can create a ConcurrentSet with the keys in their natural order by calling new ConcurrentHashSet(ConcurrentSkipListMap::new).

Worth noticing is also the addAll() method that uses Java 8's stream library to iteratively add new elements to the Set. We the filter out all add() calls that returned true and if there were more than zero such additions, we return true (i.e. there was a modification of the set).

Game, Set and match...


6 comments:

  1. Your addIfAbsent method seems exactly equivalent to the perfectly normal Set.add method, which already returns true if and only if the element was previously absent and has since been added.

    ReplyDelete
    Replies
    1. You are absolutely right Louis. Thanks for pointing out that. I have updated the post accordingly.

      Delete
  2. Dude when you write articles why don't you colorise the source code???? WHY?? Why is everything half finished?

    ReplyDelete
    Replies
    1. Hi Anonymous. I have improved the look of the code examples from now on. Checkout http://minborgsjavapot.blogspot.com/2015/12/do-not-let-your-java-objects-escape.html and tell me what you think.

      Delete
  3. What about using Collections.synchronizedSet(new HashSet(...)); ?

    ReplyDelete
    Replies
    1. A synchronized Set is not concurrent. A synchronized Set will only accept one thread at a time whereas a concurrent Set can accept a plurality of concurrent threads. Hence its name.

      Delete

Note: Only a member of this blog may post a comment.