On Sun, 22 Apr 2007, Kelly Anderson wrote:
> On 4/20/07, Ferenczi, Szabolcs <
ferenczi@...> wrote:
> > I would make a remark about TDD for multi-threading. It is possible
> > but since concurrency is not an easy area, keeping some design for
> > testing rules are more important than in sequential programs.
> >
> > The key issues are that you clearly separate the shared objects,
which
> > contain the synchronization parts and develop them so that you make
> > your multi-threading test programs deterministic for the test. Then,
> > the tests can be repeated any time with the same result.
> >
> > Once you have separated the interactions into the functionality of
some
> > shared data structures, you can TDD the threads just as if they were
> > sequential programs by mocking the interactions on the shared
objects.
> >
> > It also helps, if you are thinking in concurrency patterns instead of
> > trying to develop unique solutions. Then most of that what you have
to
> > TDD is some sequential functionality.
...
> I don't do much multithreaded programming myself, but this is the
> first really concrete set of suggestions I've seen on the subject with
> regards to TDD. I would imagine that there are lots of people out
> there that would benefit from some elaboration on the details of your
> techniques.
>
> I don't think I could pull it off from what little you've written
> here, although it sounds really interesting.
OK but it is going to be a long post. Be patient.
I am going to demonstrate TDD for multi-threading applications on a
case study for your convenience. Let us suppose that we are about to
introduce the classical producer-consumer scenario and we must develop
a bounded buffer to smooth the varying speed difference between
producer and consumer threads.
The bounded buffer should be able to store items in a FIFO order. Any
get() operation should be delayed on an empty buffer until there is any
element to take. Any put() operation should be delayed on a full buffer
until there is a slot to insert the element.
I am using Java 6 and JUnit 4 for this demonstration.
Let us assume that we have arrived at the implementation of an
unlimited buffer by traditional TDD techniques. As a starting point,
let us start from something like this:
public class BoundedBuffer<E>
{
private Queue<E> buffer = new LinkedList<E>();
public void put(E e) {buffer.add(e);}
public E get() {return buffer.poll();}
}
Now comes the interesting part from the aspect of multi-threading. The
plan is that we check the interesting points by the test cases one by one
and make the minimal implementation for them. The interesting points are:
1. A thread should be suspended on an empty buffer
2. A suspended consumer thread should continue when an element is offered
3. A thread should be suspended on a full buffer
4. A suspended producer thread should continue when a slot becomes
available
5. If more consumer threads are suspended, one should continue per item
put into the buffer
6. If more consumer threads are suspended, all should continue in turn
7. If more producer threads are suspended, one should continue per item
removed from the buffer
8. If more producer threads are suspended, all should continue in turn
Step 1: A thread should be suspended on an empty buffer
@Test
public void parSuspendOnEmptyBuffer() throws InterruptedException {
Thread t1 = new Thread() {
public void run() {
q.get();
}};
t1.start();
Thread.sleep(DELAY);
assertEquals("Buffer is empty and thread is waiting on get()",
Thread.State.valueOf("WAITING"), t1.getState());
t1.interrupt();
t1.join();
}
Note that the delay is inserted for the test to allow some time for
the tester thread to stabilize. This way we can make the test
deterministic.
This test checks exactly what we are interested in. Does the caller
gets into WAITING state if the buffer is empty? That is the
point. There is an assertion on this condition. If yes, the test case
is finished by cancelling the test thread. Otherwise the test
fails. Like this:
Red: `There was 1 failure:
1) parSuspendOnEmptyBuffer(BoundedBufferTest)
java.lang.AssertionError: Buffer is empty and thread is waiting on get()
expected:<WAITING> but was:<TERMINATED>'
Now we can try the minimal implementation that satisfies this test. So
we make the get method synchronized and suspend the thread if the
buffer is empty. Note that InterruptedException must be taken care
of.
public synchronized E get() throws InterruptedException {
while (buffer.isEmpty()) wait();
return buffer.poll();
}
Note that wait() must be applied carefully. According to the semantics
of Java, it must always be applied inside a while loop. That is why
the simpler `if' statement cannot be applied here.
Because of the InterruptedException, all the test methods must be
extended with the clause `throw InterruptedException' and the thread
must handle it as well:
try {
q.get();
} catch (InterruptedException e) {}
Green: Tests pass
Refactor: n.a.
Step 2: A suspended consumer thread should continue when an element is
offered
@Test
public void parContinueSuspendedConsumer() throws InterruptedException {
Thread t = new Thread() {
public void run() {
try {
assertEquals(new Integer(3), q.get());
} catch (InterruptedException e) {}
}};
t.start();
Thread.sleep(DELAY);
assertEquals("Buffer is empty and thread is waiting on get()",
Thread.State.valueOf("WAITING"), t.getState());
q.put(new Integer(3));
Thread.sleep(DELAY);
assertEquals("Thread must be continued and completed",
Thread.State.valueOf("TERMINATED"), t.getState());
t.join();
}
The test makes sure, that the consumer thread is in a WAITING state
before the element is inserted into the buffer. Some time is left for
the consumer thread to continue the operation and a check is made
whether it is terminated already, as it should if everything goes as
planned. Initially we get the assertion as usual in TDD:
Red: `There was 1 failure:
1) parContinueSuspendedConsumer(BoundedBufferTest)
java.lang.AssertionError: Thread must be continued and completed
expected:<TERMINATED> but was:<WAITING>'
Hmm... It looks like the suspended thread is never notified. Now, let us
make the put() method call notify() as a minimal implementation that
satisfies the test.
public synchronized void put(E e) {buffer.add(e); notify();}
Green: Tests pass
Refactor: n.a.
Step 3: A thread should be suspended on a full buffer
We specify a limit for the buffer in the setup fixture. A small number
will do. I could use 1 but in earlier tests I used two successive
puts, so I select 2 not to break other tests:
@Before
public void createBuffer() {
q = new BoundedBuffer<Integer>(2);
}
Of course, the compilation fails which can be fixed by the minimal
implementation:
public BoundedBuffer(final int limit) {}
Now, the tests pass but the step is not completed yet. Here is the
effective test for this step:
@Test
public void parSuspendOnFullBuffer() throws InterruptedException {
Thread t = new Thread() {
public void run() {
q.put(new Integer(1));
q.put(new Integer(2));
q.put(new Integer(3));
}};
t.start();
Thread.sleep(DELAY);
assertEquals("Buffer is full and thread is waiting on put(new Integer(3))",
Thread.State.valueOf("WAITING"), t.getState());
t.interrupt();
t.join();
}
Red: `There was 1 failure:
1) parSuspendOnFullBuffer(BoundedBufferTest)
java.lang.AssertionError: Buffer is full and thread is waiting on put(new
Integer(3)) expected:<WAITING> but was:<TERMINATED>'
Now, to satisfy the test, we must take the value for the limit, keep
track of counting the elements, and suspend the producer thread when
the number of elements would exceed the limit.
private int limit;
private int elements = 0;
public BoundedBuffer(final int limit) {this.limit = limit;}
public synchronized void put(E e) throws InterruptedException {
while (elements >= limit) wait();
buffer.add(e);
++elements;
notify();
}
Unfortunately, this was a big step in the implementation, which is not
very nice.
Green: Tests pass
Refactor: n.a.
Step 4: A suspended producer thread should continue when a slot becomes
available
@Test
public void parContinueSuspendedProducer() throws InterruptedException {
Thread t = new Thread() {
public void run() {
try {
q.put(new Integer(1));
q.put(new Integer(2));
q.put(new Integer(3));
} catch (InterruptedException e) {}
}};
t.start();
Thread.sleep(DELAY);
assertEquals("Buffer is full and thread is waiting on put(new Integer(3))",
Thread.State.valueOf("WAITING"), t.getState());
assertEquals(new Integer(1), q.get());
Thread.sleep(DELAY);
assertEquals("Thread must be continued and completed",
Thread.State.valueOf("TERMINATED"), t.getState());
t.join();
}
Red: `There was 1 failure:
1) parContinueSuspendedProducer(BoundedBufferTest)
java.lang.AssertionError: Thread must be continued and completed
expected:<TERMINATED> but was:<WAITING>'
A quick check on the buffer can indicate that the counter is never
decremented and the producer is not notified either:
In get() replace
return buffer.poll();
with
E e = buffer.poll();
--elements;
notify();
return e;
Green: Tests pass
Refactor: Once we have the private counter, we can replace the
`buffer.isEmpty()' by condition `elements <= 0'
Steps 5-8: I skip them now.
Final notes:
1. If an assertion happens in a tester thread, the message lines are
inserted among the progress indicators.
2. There is a JUnit plug-in in preparation that makes multi-threaded
unit testing even simpler. However, this technique can be used in
other frameworks for other languages as well.
3. I don't like to see `Thread.sleep(DELAY);' in production code
Best Regards,
Szabolcs