Thursday, November 3, 2011

Partitioned Concurrency

Here is a common block of code in Java. Assume cache is a Map of some form.

if (!cache.containsKey(someKey)) {
cache.put(someKey,someValue);
}


Or: if a given object is not in the map with a certain key, put it there. Otherwise, proceed along your merry way.

I was writing code like this in a controller the other day, because I wanted to cache Broadcaster objects (from the Atmosphere framework) that would manage separate chat channels. Requests would come in and get attached to one or another Broadcaster based on a channel ID.

But that code snippet, in its current form, isn't thread-safe. You could have two threads get past the if and thus both think that the map needs the object inserted. That isn't always a problem — my reflection-based API marshaling layer caches Field and Method objects that are constant for a given class, making me unconcerned about threads that replace values — but in this case, you could have the second thread replace the Broadcaster that the first one inserted, meaning that some chunk of the chat clients wouldn't see messages because they'd be attached to the wrong object.

The standard way to solve this is to put a mutex around the cache-modification code:
synchronized(this) {
if (!cache.containsKey(someKey)) {
cache.put(someKey,someValue);
}
}


But this is code that is likely to have a lot of requests fired at it, and synchronizing this way introduces a major performance bottleneck. Every single request would have to wait for that lock. Yes, I know: Premature optimization is the root of all evil. Still, it seemed problematic. Synchronization at that high of a level can cause major issues when you're dealing with hundreds of thousands of concurrent users, which we may very well be when we launch.

As I was pondering this, I remembered a technique for increasing concurrency. I had discussed it with friends before, but never implemented it. Still, it's straightforward enough. (In fact, it's how java.util.concurrent.ConcurrentHashMap is implemented, or near enough.) Note that this technique should also work in Objective-C and any other language with similar semantics, though I haven't tried it anywhere other than Java.

Here's the idea: If you've got some sort of cache, you only really need to make sure that two threads aren't working on the same cache entry at the same time. If I have a Broadcaster, and I attach it to some ID for the channel, I only care about isolating activity around that ID. I don't care if someone's working on some other ID. (except when I do: see below) In other words, if I'm working with an ID of 3, I don't care if some other thread is checking about whether or not ID 1 already exists; I only care that someone else asking about ID 3 doesn't cause problems.

So if you could create a sequence of locks, and just ensure that anyone working on the same ID ends up synchronizing on the same lock, you're good to go.

Consider this code:
public class ConcurrentCache {
private static Object[] locks = new Object[10];
static {
for (int lockIndex = 0; lockIndex < locks.length; lockIndex++) {
locks[lockIndex] = new Object();
}
}

private Map cache = new HashMap();
public Object getOrInsertIntoCache(Object key) {
synchronized(locks[key.hashCode() % locks.length]) {
if (!cache.containsKey(key)) {
cache.put(someKey,new Object());
}
return cache.get(key);
}
}
}


The static initialization code gives you ten locks to work with, and you can get to one by just modding the hash code by the length of the list. Any given ID will always end up with the same lock (assuming a consistent hashCode result, which is true of Long, Integer, String, and other object representations of Java primitives typically used as keys.

To test this, I wrote a simple program that would put 5,000 tasks on an ExecutorService with ten threads. Each thread would generate a random number between one and 100. That would become the key to use on the cache. Depending on some command-line arguments, those threads would either lock on the cache or on a partitioned lock as above. Any given thread captured current system time when it was constructed (put on the queue) and then printed the difference between the new system time and the start time when it eventually ran. I ran the program and eagerly checked the average wait times each thread experienced.

Only to discover that they had almost identical performance. The partitioned code showed threads waiting a mere millisecond less, on average, than the "block on everything" code. And that was averaged over 5,000 jobs, remember. That's a lot of complexity for not a lot of gain.

I figured out what was going on with a bit of poking. The basic code says: See if the key's already there, and, if it's not, insert it. What that really meant was that 4,900 of my jobs called containsKey, saw that the key was there, and exited. I had an extraordinarily high cache hit rate. The lock was acquired and released so quickly that there simply wasn't noticeable lock contention.

Once I realized that, I made a simple change. After doing the cache logic, I simply had each thread sleep for one millisecond. And I ran my program again.

That produced the results I expected! Threads in the "block on everything" version waited almost precisely ten times as long, on average, as their partitioned cousins (29 seconds versus 3).

Real-world caches are messy things. In fact, my simple test case wouldn't really be done this way at all: You'd prefill the cache with the fixed items you wanted and avoid all this nonsense. But real caches need to expire items, cached results can be more or less complex, and so forth. The caching situation I have involves lots of different IDs with corresponding amounts of object churn. So the in situ results will likely be very different than an almost exact division. Still, it obviously made a big difference for time-consuming activity and at least didn't hurt performance in the simple case. You have more code complexity, which means more potential bugs and less maintainability, but in my system this logic is tucked into a class by itself, so no clients need to worry about these details: They just request a Broadcaster and don't worry about how it gets to them.

Still, if your cache handling is no more sophisticated than this, synchronizing on the whole thing was basically equal in speed, and you should therefore avoid the readability/maintainability cost altogether. If you're worrying about a one-millisecond difference across 5,000 tasks in your server code, you're farther along in your optimizations than I am.

There's also the question of what to do when you do care about the overall state of the cache. For instance, what if you want to get the size of this cache? The above technique won't work, because no one lock guarantees a fixed state. (Ignoring the reality that you probably don't actually care about the exact size of the cache: You simply want the approximation, in which case you're fine.)

Actually, there is one lock that will guarantee the state of the overall cache, and that's the cache itself. If you really care about exact counts, you can synchronize on the cache itself for anything that changes its size. For instance:
public class ConcurrentCache {
private static Object[] locks = new Object[10];
static {
for (int lockIndex = 0; lockIndex < locks.length; lockIndex++) {
locks[lockIndex] = new Object();
}
}

private Map cache = new HashMap();
public Object getOrInsertIntoCache(Object key) {
synchronized(locks[key.hashCode() % locks.length]) {
if (!cache.containsKey(key)) {
synchronized(cache) {
cache.put(someKey,new Object());
}
}
return cache.get(key);
}
}
}


You re-introduce the global lock, but you minimize when it's acquired. If you have a high cache hit rate, this should still give you better concurrency but allowing for across-the-board thread safety (though, again, do you really care that your size is 99 and not 98?)

This isn't a new technique, but it was worth jotting down so I don't forget it.