In a concurrent environment, we often see ConcurrentMap<Key, Integer>.

An good example is tracking the number of subscriptions for a topic. Keeping this number thread-safe could be important if the number is used in determine whether it is safe to unsubscribe the upstream feed providing that topic.

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;


public class SubscriptionCount {

  private final ConcurrentMap<String, Integer> map = new ConcurrentHashMap<>();

  /** Always return true */
  public boolean add(String ticker) {
    while(true) { // thread contention
      // Similar to double lock checking to avoid object creation
      Integer existing = map.get(ticker);
      if (existing == null) {
        existing = map.putIfAbsent(ticker, Integer.valueOf(1));
        if (existing == null) {
          return true;
        }
      }

      if (map.replace(ticker, existing, existing+1)) {
        // run from the beginning just in case ticker is removed
        return true;
      }
      // continue;
    }
  }

  /** Return true if collection is modified as the result */
  public boolean remove(String ticker) {
    while(true) { // thread contention
      Integer existing = map.get(ticker);
      if (existing == null) {
        return false;
      }

      if (existing.equals(Integer.valueOf(1))) {
        if (map.remove(ticker, Integer.valueOf(1))) {
          return true;
        }
        // continue;
      } else if (map.replace(ticker, existing, existing-1)) {
        return true;
      }
      // continue;
    }
  }
}

Starting from Guava V2.0, ConcurrentHashMultiset does exactly what we want here. It is implemented with ConcurrentHashMap<T, Integer>.

Many of us surely must have thought about reducing objection creation by replacing the immutable Integer with a mutable AtomicInteger and ends up with a ConcurrentHashMap<T, AtomicInteger>.

The problem with this, the AtomicInteger does not implement equals(). As the result, it is the identity equality (==) which defined in Object.equals(). The replace(…​)`s would always success in replacing the existing `AtomicInteger that we obtained from get(…​) as it is the same instance.

This problem is solved in Guava v10.0 by limiting add(…​) to update the AtmoicInteger iif the existing one is not 0. If it is 0, that implies there is a thread contention. In that case, add(…​) has to replace the 0 one with a new AtomicInteger.

// In the case of a concurrent remove, we might observe a zero value, which means another
// thread is about to remove (element, existingCounter) from the map. Rather than wait,
// we can just do that work here.

Eventually, probably within a nanosecond, the thread that updated the old AtomicInteger to 0 would either removed the AtomicInteger that having 0 from the map before any other threads have noticed or it has already replaced. In that case, there is nothing else to do.

Food for thought, what if we have a flaky connections with a lonely subscriber that 0⇒1⇒0⇒1⇒0 a lot? Can we reduce the object creation of Atomicinteger? How about instead of removing 0 straight away, we mark 0 to -1 if nothing has touched it for a while?