Monday, July 23, 2007

Implementing a Copy-on-Write HashMap in Java

Under Unix, when a process is forked, the process that is forked is cloned (almost). This involves writing a large amount of data to create a new copy for the newly created process. This allows the parent and the child to have their own copies of the data they require to execute. After the fork occurs, either process can modify their copy of this data without affecting the other process.

To copy large amounts of data has an obvious impact on performance, so developers quickly realised that they could optimise this in the following manner:

1. When the fork occurs the data is not copied, instead it is marked as read only.
2. If either process attempts to modify the data, then, and only then, a copy is made for that process.

This approach works on the basis that most data will not be modified.

Java 1.5 introduced a couple of approaches for doing this with Collections: CopyOnWriteArrayList, CopyOnWriteArraySet. These work by creating a new copy of the underlying Collection only when data is modified.

I recently had the following requirement that was not met by this new class:

1. Each Thread is initialised with an identical HashMap containing configuration information.
2. If a Thread changes any of this information, the change is local to them.

This is an example where Copy-on-Write works well. If we assume that most Threads will not change most of the values in the HashMap, there is a large performance benefit associated with implementing a copy-on-write implementation.

In order to allow each Thread to have its own local values, the ThreadLocal class is invaluable. This allows each Thread to have an Object (in this case a HashMap) that is local to themselves, and cannot be seen by other threads.

My CopyOnWriteCache class below is initialised with the Global HashMap representing the default configuration, but is also given a ThreadLocal. Any put() operation updates the ThreadLocal HashMap rather than the global one.

Converseley, any get() method first looks in the ThreadLocal HashMap, and then looks in the global HashMap if no value is present.

By running the CopyOnWriteRunner class below you can see an example. The HashMap is initialised with a key called "key1" having a value of "value". Three threads are then created and started. Each of these prints out the value of this key, modifies the value of the key, and then prints the value again. Each thread will initially see the default value, even if it has already been changed by another thread.


++++++++++++++CopyOnWriteCache+++++++++++++++++


import java.util.HashMap;
import java.util.Map;

public class CopyOnWriteCache {

private Map theMap;

/**
* The ThreadLocal has a HashMap that can be used for storing local
* values.
*/
ThreadLocal> localMap =
new ThreadLocal>() {
protected Map initialValue() {
return new HashMap();
}
};

/**
* The object is initialised with a Map that contains default values.
* These can be overriden in a manner specific to this Object.
*/
CopyOnWriteCache(Map initialisedMap) {
theMap = initialisedMap;
}

/**
* get() first looks for a ThreadLocal value - if none is available
* it returns the global value.
*/
public V get(K theKey) {
if (localMap.get().containsKey(theKey)) {
return localMap.get().get(theKey);
}
return theMap.get(theKey);
}

/**
* Put does a copy-on-write, and stores the value in
* its ThreadLocal map.
*/
public void put(K theKey, V theValue) {
localMap.get().put(theKey, theValue);
}

}

++++++++++++++CopyOnWriteRunner+++++++++++++++++

import java.util.HashMap;
import java.util.Map;


public class CopyOnWriteRunner {

public static void main(String[] args) {
Map m = new HashMap();
m.put("Key1", "Value");
m.put("Key2", "Value");
CopyOnWriteCache copyOnWriteCache = new CopyOnWriteCache(m);
CacheRunner cr1 = new CacheRunner(copyOnWriteCache, "Key1", "New Value 1");
CacheRunner cr2 = new CacheRunner(copyOnWriteCache, "Key1", "New Value 2");
CacheRunner cr3 = new CacheRunner(copyOnWriteCache, "Key1", "New Value 3");
Thread t1 = new Thread(cr1);
Thread t2 = new Thread(cr2);
Thread t3 = new Thread(cr3);
t1.start();
t2.start();
t3.start();
}

private static class CacheRunner implements Runnable {

CopyOnWriteCache cache;
String key;
String value;

public CacheRunner(CopyOnWriteCache theCache, String theKey, String theValue) {
cache = theCache;
key = theKey;
value = theValue;
}

public void run() {
System.out.println("Initial Value = " + cache.get(key));
cache.put(key, value);
System.out.println("New Value = " + cache.get(key));
}

}

}

Readers/Writers Problem in Java

The Readers/Writers problem is a classic Synchronisation problem. If we assume a shared resource, such as an object in memory, or a file on disk, the following rules will often apply:
1. Many threads can read values from the resource simultaneously
2. Only one thread may write changes to the resource at any one time, and no other threads should be allowed to write changes until the first thread finishes (allowing them to make all their changes atomically).
3. If a thread is in the process of writing changes, no thread should be allowed to read the resource until the write operation has finihed.
 Solutions to the Reader-Writer problem are critical within an Operating System, where access to shared resources (such as the file system) must be optimised, and where many competing threads are fighting for access. Solutions are also critical for database applications, caches, and many other user level applications.
 
Although the problem is relatively simple, an optimal solution can be complex once certain considerations are taken into account:

1. If there are both waiting readers and waiting writers when a write lock is released, who should be given the lock first? If either readers and writers are always preferred ahead of the other, what happens in cases of high utilization? Either readers or writers may be starved from the resource completely.
2. If there are both waiting readers and waiting writers when a read lock is released, who should be given the lock first? The same considerations apply as in point one.
3. If a thread holds a write lock, should it be allowed to convert this into a read lock? This has potential implications for waiting writers.

Creating a "fair", but also optimal solution to the Reader-Writer ultimately depends on the specific situation. For instance, the soution may be different for a case where reads are very common and writes are very rare compared to a case where writes occur more often than reads.

The 1.5 release of Java includes an Interface for providing a solution to the Reader-Writer problem called ReadWriteLock. There is also an implementation called ReentrantReadWriteLock. Although the low level concurrency operations in Java (synchronized block, wait/notify functionality) ease the creation of an implementation, the ReentrantReadWriteLock class provides a well tested implementation that will suit most needs.

The following is an example of using a ReadWriteLock to control access to a HashMap. Many threads can simultaniously read values from the Map, but an exclusive Write Lock must be obtained before writing a new value for a key.

+++++++++++++++BEGIN CODE+++++++++++++++

import java.lang.Thread;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class LockedHashMap {

//This is the shared object we are controlling access to
private Map theMap = new HashMap();
/*
  * This is the lock we are using to control shared access. Passing a value
* of true tells the ReadWriteLock to fairly allocate the locks.
  */
private ReentrantReadWriteLock theLock = new ReentrantReadWriteLock(true);

public static void main(String[] args) {
LockedHashMap lhm = new LockedHashMap();
lhm.testLocking();
}
public LockedHashMap() {
// Initialise the Map with some initial values
theMap.put("Key1", "Value1");
theMap.put("Key2", "Value2");
theMap.put("Key3", "Value3");
}

private void testLocking() {
/*
* These are the threads that will simultaniously attempt to access the shared object
*/
HashMapReader hmr1 = new HashMapReader("One", "Key1");
HashMapReader hmr2 = new HashMapReader("Two", "Key2");
HashMapReader hmr3 = new HashMapReader("Three", "Key3");
HashMapReader hmr4 = new HashMapReader("Four", "Key1");
HashMapReader hmr5 = new HashMapReader("Five", "Key2");
HashMapReader hmr6 = new HashMapReader("Six", "Key3");
HashMapReader hmr7 = new HashMapReader("Seven", "Key1");

HashMapWriter hmw1 = new HashMapWriter("One", "Key1", "Value_new1");
HashMapWriter hmw2 = new HashMapWriter("Two", "Key2", "Value_new2");
HashMapWriter hmw3 = new HashMapWriter("Three", "Key3", "Value_new3");

new Thread(hmr1).start();
new Thread(hmw1).start();
new Thread(hmr2).start();
new Thread(hmr3).start();
new Thread(hmw2).start();
new Thread(hmr4).start();
new Thread(hmw3).start();
new Thread(hmr5).start();
new Thread(hmr6).start();
new Thread(hmr7).start();

}

/**
*  A Reader takes the key of the object it is attempting to access,
*  gets the value, and prints the result. It Sleeps for 2 seconds while
*  holding the lock to simulate a more long running operation.
*/
private class HashMapReader implements Runnable {
private String name;
private String key;

public HashMapReader(String theName, String theKey) {
name = theName;
key = theKey;
}
public void run() {
try {
theLock.readLock().lock();
String value = theMap.get(key);
System.out.println("The Reader " + name + " has read the key " + key + " with a value " + value);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {}
} finally {
theLock.readLock().unlock();
}
}

}

/**
*  A Writer takes ta key and a value, and updates the value of the key,
*  and then prints the result. It Sleeps for 4 seconds while
*  holding the lock to simulate a more long running operation.
*/
private class HashMapWriter implements Runnable {

private String name;
private String key;
private String value;

public HashMapWriter(String theName, String theKey, String theValue) {
name = theName;
key = theKey;
value = theValue;
}
public void run() {
try {
theLock.writeLock().lock();
theMap.put(key, value);
System.out.println("The Writer " + name + " has written the key " + key + " with the value "+ value);
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
}
} finally {
theLock.writeLock().unlock();
}

}

}
}

+++++++++++++++END CODE+++++++++++++++

A couple of points to note regarding this example are as follows: 
The constructor for ReentrantReadWriteLock takes a boolean indicating whether it should be "fair" with the locks. To be fair in this case basically means to give out locks primarily in the order that they were requested. Although this policy should avoid starvation, it is not necessarily the most efficient approach: in the above example, using a fair policy takes approximately twice as long as an unfair one.

In the worst case, a fair policy may mean every read needs to wait on a write (assuming reads and writes were requested in an alternating manner). An unfair (but efficient) policy will attempt to group as many reads together as possible, since they will not block each other. Although this may prove optimal, as mentioned above, there is a risk starvation could occur.

Another point to note is that it is critical that locks are released in a finally block. If a thread acquired a write lock, but due to an exception, never released it, a deadlock situation would be created.
 

Saturday, July 21, 2007

Java Semaphores

Java 1.5 introduced a number of concurrency libraries that build on the basic Java concurrency mechanisms (synchronized, wait/notify). A number of these libraries provide implementations of well know and understood approaches to writing multi-threaded code. One of the most useful additions is the Semaphore class.

Semaphores are a widely used concurrency mechanism, and a common approach to guarding access to limited shared resources in Operating Systems (particularly Unix). Semaphores work on the principle that only a specific number of threads (one or more) may access a shared resource at the same time.

In order to gain access to a shared resource, a thread must first acquire a lock from the semaphore. Once the lock has been obtained, the thread can use the resource. If no locks are available, they must wait until one becomes available.

Once the thread has finished using the resource, they release the lock, and notify anyone waiting for the lock that a lock is available.

The classic Semaphore works as follows:

1. A Semaphore is initialised with a specific number of locks (one or more)
2. When a thread wishes to obtain a lock, they call an acquire procedure. If the number of available locks is one or more, this will decrement the number of locks available, and return a lock to the caller. If the number of locks is less than one, the thread waits until a lock becomes available.
3. When a thread wishes to release a lock, they call a release procedure: this increases the number of locks available by one. Any waiting threads are then notified that a lock is available, and one of them (randomly selected) will acquire the lock.

The points to note regarding the implementation are as follows:

1. Once the Semaphore is initialised with a specific number of locks, there is no way to find out how many locks are available.
2. The acquire and release procedures must be atomic: i.e. a thread can not be interrupted in the middle of acquiring or releasing a lock.

It has always been fairly straight forward to implement a Semaphore in Java, and I have done so on a number of occasions. One common use is for creating a pool of objects that can be accessed by the threads in an application, such as a pool of Socket connections.
The basic Java concurrency building blocks can be utilised to fulfill the main functionality:
1. Synchronized blocks can be used to ensure the acquire and release are atomic.
2. Wait/notify can be used to implement the functionality of waiting for a lock to become available.

Although it is relatively simple to write your own Semaphore, there are obvious advantages to using a widely used library.
The following is an example of using the Semaphore class in Java. The Pool class below simulates a Pool of resources that threads wish to obtain access to. The Semaphore guards access to this Pool, to ensure that if all the resources are utilised, threads need to wait for one to be released.


import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;

public class Pool {

private static final int NUMBER_OF_ELEMENTS = 5;

/**
* Create a Semaphore with 5 locks available
*/
private Semaphore sem = new Semaphore(NUMBER_OF_ELEMENTS);

/*
* These are the actual resources the we are restricting access to
*/
private List resources = new ArrayList();

public Pool() {
for (int i = 0; i < NUMBER_OF_ELEMENTS; i++) {
resources.add("Resource "+i);
}
}


public String getResource() {
try {
sem.acquire();

} catch (InterruptedException ex) {
}
String resource = null;
synchronized(resources) {
resource = resources.remove(0);
}
return resource;
}

public synchronized void freeResource(String resource) {
resources.add(resource);
sem.release();
}

}

The following Main class utilises this Pool by creating a number of Threads and simultaneously attempting to obtain resources. Once a resource is gained, the thread holds it for a random amount of time to simulate using the resource:


public class Main {

public Main() {
}

public static void main(String[] args) {
Pool p = new Pool();
for (int i = 0; i < 12; i++) {
Thread t = new Thread(new Client(i, p));
t.start();
}
}

private static class Client implements java.lang.Runnable {

int name;
Pool pool;

public Client(int theName, Pool thePool) {
name = theName;
pool = thePool;
}

public void run() {
String resource = pool.getResource();
System.out.println("Client " + name + " has acquired resouce " + resource);
try {
// Hold the resource for a period of time
long sleepTime = (long) (10000 * java.lang.Math.random());
java.lang.Thread.sleep(sleepTime);
} catch (InterruptedException ex) {
}
pool.freeResource(resource);
}


}

}

The Semaphore class goes beyond the original scope of the classic Semaphore. In addition to the core functionality, the following features have been provided:

1. An availablePermits()method is provided. This can be used to find out how many locks are currently available. Although this goes against the original intention of Semaphores, it is useful both for debugging, and special cases, such as a thread that does not wish to obtain a lock if only one is available.

2. A getQueueLength() method is provided. Once again, this is primarily useful for debugging or management type tools. For instance, it might be useful to expose this through a JMX interface to allow a connection pool to be monitored.

3. A tryAcquire() method is provided. This is similar to the acquire method, except if no resources are available, it does not wait. An overloaded version of this allows the caller to specify that they are prepared to wait, but only for a specific amount of time.