In a concurrent environment, we often see ConcurrentMap<Key, Integer>
.
An good example is tracking the number of subscriptions for a topic. Keeping this number thread-safe could be important if the number is used in determine whether it is safe to unsubscribe the upstream feed providing that topic.
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class SubscriptionCount {
private final ConcurrentMap<String, Integer> map = new ConcurrentHashMap<>();
/** Always return true */
public boolean add(String ticker) {
while(true) { // thread contention
// Similar to double lock checking to avoid object creation
Integer existing = map.get(ticker);
if (existing == null) {
existing = map.putIfAbsent(ticker, Integer.valueOf(1));
if (existing == null) {
return true;
}
}
if (map.replace(ticker, existing, existing+1)) {
// run from the beginning just in case ticker is removed
return true;
}
// continue;
}
}
/** Return true if collection is modified as the result */
public boolean remove(String ticker) {
while(true) { // thread contention
Integer existing = map.get(ticker);
if (existing == null) {
return false;
}
if (existing.equals(Integer.valueOf(1))) {
if (map.remove(ticker, Integer.valueOf(1))) {
return true;
}
// continue;
} else if (map.replace(ticker, existing, existing-1)) {
return true;
}
// continue;
}
}
}
Starting from Guava V2.0, ConcurrentHashMultiset
does exactly what we want here. It is implemented with ConcurrentHashMap<T, Integer>
.
Many of us surely must have thought about reducing objection creation by replacing the immutable Integer
with a mutable AtomicInteger
and ends up with a ConcurrentHashMap<T, AtomicInteger>
.
The problem with this, the AtomicInteger
does not implement equals()
. As the result, it is the identity equality (==)
which defined in Object.equals()
. The replace(…)`s would always success in replacing the existing `AtomicInteger
that we obtained from get(…)
as it is the same instance.
This problem is solved in Guava v10.0 by limiting add(…) to update the AtmoicInteger
iif the existing one is not 0. If it is 0, that implies there is a thread contention. In that case, add(…) has to replace the 0 one with a new AtomicInteger
.
Eventually, probably within a nanosecond, the thread that updated the old AtomicInteger
to 0 would either removed the AtomicInteger
that having 0 from the map before any other threads have noticed or it has already replaced. In that case, there is nothing else to do.
Food for thought, what if we have a flaky connections with a lonely subscriber that 0⇒1⇒0⇒1⇒0 a lot? Can we reduce the object creation of Atomicinteger
? How about instead of removing 0 straight away, we mark 0 to -1 if nothing has touched it for a while?