The paradigm shift is here

Multithreading, concurrency, parallelism, Core 2 Duo, call it what you will, the revolution is happening. Today.

Chips aren’t getting any faster. Intel have quietly dropped references to clock speeds and chip generations in favour of marketing slogans indicating the number of cores per CPU.

Moore’s law has had a head-on collision with the laws of physics, with predictable outcome.

The future is about making things smaller and having more of them. We’re moving from a world where clock speed doubled every 18 months to one where cores will double every 18 months. Dual core CPUs are commonplace. Quad core is just around the corner. In 5 years time 16 or 32 core chips will be in your desktop.

So here’s the opportunity for software platform vendors: be the best at managing parallelism and making it easy for application developers to leverage all those cores to win tomorrow’s market. Threads (ie. Java and C# threads) are old technology, and a dreadful way of expressing concurrency. Language constructs such as Communicating Sequential Processes that allow developers to explicitly describe concurrency are far easier to understand and reason about. See also: the book and wikipedia’s entries on CSP and process calculus.

Myself I’m hoping the smalltalk vendors pay attention and adapt. They have the advantage that smalltalk is mostly written in itself. Paradigm shifts don’t come around that often. Fortune favours the prepared mind.

Tuning thread count

Once more into the breach of microbenchmarking:

import java.math.BigInteger;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;

public class SillyThroughputTest {
private static final int MAX_TASKS = 100;
private static final int MAX_LOOP = 100000;
private static AtomicLong foundPrimes = new AtomicLong();

public static void main(String[] args) {
List taskList = new LinkedList();
for (int i = 0; i < MAX_TASKS; i++) {
Runnable task = buildTask();
taskList.add(task);
}

//        runEachTaskInNewThread(taskList);

runEachTaskInFixedSizeExecutor(taskList);
}

private static void runEachTaskInFixedSizeExecutor(List taskList) {
long started = System.currentTimeMillis();

ExecutorService executor = Executors.newFixedThreadPool(2);
List results = new LinkedList();
while (!taskList.isEmpty()) {
Runnable runnable = taskList.remove(0);
Future future = executor.submit(runnable);
results.add(future);
}

for (Future future : results) {
try {
future.get();
} catch (InterruptedException e) {

} catch (ExecutionException e) {

}
}


long stopped = System.currentTimeMillis();

long durationMillis = stopped - started;

System.out.println("Took " + (double) durationMillis / 1000d + " s in total, counting " + foundPrimes.get() + " primes");

executor.shutdown();
}

private static void runEachTaskInNewThread(List taskList) {
long started = System.currentTimeMillis();
runEachTaskInANewThreadAndWaitForCompletion(taskList);
long stopped = System.currentTimeMillis();

long durationMillis = stopped - started;

System.out.println("Took " + (double) durationMillis / 1000d + " s in total, counting " + foundPrimes.get() + " primes");
}

private static void runEachTaskInANewThreadAndWaitForCompletion(List taskList) {
List workers = new LinkedList();
while (!taskList.isEmpty()) {
Runnable runnable = taskList.remove(0);
Thread thread = new Thread(runnable);
workers.add(thread);
}

for (Thread thread : workers) {
thread.start();
}

for (Thread thread : workers) {
try {
thread.join();
} catch (InterruptedException e) {
// go on to the next one
}
}
}

private static Runnable buildTask() {
return new Runnable() {
public void run() {
long startMillis = System.currentTimeMillis();
for (int i = 0; i < MAX_LOOP; i++) {
BigInteger bigInteger = new BigInteger("" + i);
if (bigInteger.isProbablePrime(10)) {
foundPrimes.incrementAndGet();
}
}
}
};
}
}

Using a fixed pool of 2 threads:
Took 43.594 s in total, counting 959201 primes (99% cpu)

Using a thread per task (100 in this case):
Took 92.5 s in total, counting 959200 primes (65% cpu)

Why you need to worry about parallelism

Try this simple test:

import java.util.concurrent.atomic.AtomicInteger;

public class SillyThreadTest {
public static void main(String[] args) throws Throwable {
final AtomicInteger count = new AtomicInteger(0);
long startMillis = System.currentTimeMillis();
try {
while (true) {
count.getAndIncrement();
Thread aThread = new Thread(new Runnable() {
public void run() {
while (true) {
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
// doh
}
}
}
});

aThread.start();
}
} catch (Throwable e) {
long stopMillis = System.currentTimeMillis();
long durationMillis = stopMillis - startMillis;
double seconds = (double) durationMillis / 1000d;
System.out.println("Thread Count = " + count.get() + ", after: " + seconds + " s, exception: " + e);
throw e;
} finally {
System.exit(0);
}
}
}

On my fairly new dual core AMD athlon with 2 gigs of memory, I get a top figure of around 7200 threads and an OutOfMemoryError in 2.1 seconds with no extra flags on Java 1.5. Yes I know the default heap is tiny – increasing it only makes the thread count worse. Try it if you don’t believe me.

Thread Count = 7214, after: 2.125 s, exception: java.lang.OutOfMemoryError: unable to create new native thread

Hmm. Lets see how many Runnables I can add to a list:

import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicInteger;

public class SillyRunnableTest {
public static void main(String[] args) throws Throwable {
final AtomicInteger count = new AtomicInteger(0);
long startMillis = System.currentTimeMillis();
LinkedList list = new LinkedList();
try {
while (true) {
count.getAndIncrement();
list.add(new Runnable() {
public void run() {
while (true) {
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
// doh
}
}
}
});
}
} catch (Throwable e) {
list.clear(); // Clear the list to free some memory
long stopMillis = System.currentTimeMillis();
long durationMillis = stopMillis - startMillis;
double seconds = (double) durationMillis / 1000d;
System.out.println("Runnable Count = " + count.get() + ", after: " + seconds + " s, exception: " + e);
throw e;
} finally {
System.exit(0);
}
}
}

The output from this one:

Runnable Count = 2078005, after: 3.531 s, exception: java.lang.OutOfMemoryError: Java heap space

If I bump the heap to 512m:

Runnable Count = 16643378, after: 28.969 s, exception: java.lang.OutOfMemoryError: Java heap space

In both tests my cpu maxed at 50% load (dual core, remember). Thread.sleep() is hardly a representative activity, but the point is that once the cpu hits 100%, it makes no difference how many more threads you add, performance will only get worse due to context switching. The same applies for forking multiple processes. There is only so much cpu to go around. For a cpu-bound workload on my dual-core machine the optimum number of threads is 2.

Pointless microbenchmarking? Almost certainly. Scale by spawning threads? No.

Packet Hackery

Been up to my elbows in TCP/IP mechanics today. Its all about the SYNs, FINs and ACKs. Oh, and the PSHes and RSTs. Its sometimes useful to understand that when you have a TCP/IP ‘connection’ to another computer, you really don’t. There is no tiny portion of the internet that becomes your personal hotline to the other machine. All a TCP/IP connection really is is an agreement between two machines that they will exchange packets in a certain way.

Lets say you open browser and point it at google. Before any of the HTTP fun happens, the following occurs:

You send a SYN.
Google sends a SYN & ACK together.
You send an ACK.
You are now ‘connected’.

Various flags and sequence numbers are contained in the packets that enable both parties to agree on how they will number their packets, and thus how they will spot any that don’t arrive or arrive out of sequence. Packet exchange occurs, with every packet sent being ACKed. An ACK can also piggyback on the next data packet if there happens to be one going out soon enough. When its time to end the conversation, the first party to hang up sends a FIN, then waits for an ACK and a corresponding FIN from the other party.

If one side crashes without sending a FIN then you get a ‘half open’ connection whereby one party thinks all is well, the other has no knowledge of a connection. As soon as either side attempts to send data (or a new SYN), the other party will know something is up and send a RST (reset) packet. This tells the recipient to abort and try it all again, so its back to the SYN, SYN+ACK, ACK shenanigans. Also known as the three-way handshake. So there you go, a 30 second introduction to the magic of TCP/IP.