Dalke Scientific Software: More science. Less time. Products
[ previous | newer ]     /home/writings/diary/archive/2012/01/17/I_parallelize_an_algorithm

I parallelize an algorithm

I, with help from Kim Walisch, have been adding OpenMP support to chemfp. This is the first time I've used OpenMP, and I'm pleased to say there are places where it works really well. However, OpenMP, like every other parallization strategy, is no global panacea. It can take a lot of effort to get good scaling, and there are cases where it doesn't feel any easier to use OpenMP than to use pthreads (POSIX threads).

In this essay I'll walk through how I converted a single-threaded algorithm to OpenMP, and compare the results to a version built on an Python async I/O library atop of pthreads.

The single-threaded algorithm

Suppose you have a list of N objects - let's call them "fingerprints" - and a function which compares two fingerprints - call it "tanimoto" - which returns a similarity score value from 0.0 to 1.0. A score of 0.0 means "not similar" and 1.0 means "very similar." The similarity of a fingerprint compared with itself is always 1.0, and the tanimoto function is symmetric, so that tanimoto(x, y) == tanimoto(y, x).

One question you can ask is "which fingerprints are within threshold similarity to the tenth fingerprint?" I'll use the term "neighbor" to include any fingerprint which at least threshold similar to a given fingerprint, so I can restate the above as "what are the threshold neighbors of the tenth fingerprint.

The code for this is not hard:

  fingerprint_t fingerprints[] = {array of fingerprint objects};
  int query_index = 9;   // The tenth fingerprint
  assert(0.0 <= threshold && threshold <= 1.0);
  
  for (target_index=0; target_index<N; target_index++) {
    if (tanimoto(fingerprint[query_index], fingerprint[target_index]) >= threshold) {
      if (query_index != target_index) {
          printf("%d is a neighbor\n", target_index);
      }
    }
  }
The main subtlety is the check that I don't report that the first fingerprint is a neighbor of itself. There are a few ways to handle that case: here I chose one which is optimized for performance, assuming relatively view targets are similar enough.

Fingerprint searches are in a high-dimensional space so optimizations like k-d trees, which work for lower dimensional spaces, suffer from the curse of dimensionality. For exact answers, the best you can expect is linear performance. There are clever ways to get sublinear performance, but the worst case is still linear. Still, computers are fast, and can search 100,000 fingerprints in a blink.

Another question you can ask is, what are the neighbor counts for all of the fingerprints in the data set? Here's code which computes that:

  fingerprint_t fingerprints[] = {array of fingerprint objects};
  int count, query_index, target_index;
  int counts[N] = {}; // initialize to 0
  assert(0.0 <= threshold && threshold <= 1.0);
  
  for (query_index=0; query_index<N; query_index++) {
    count = 0;
    for (target_index=0; target_index<N; target_index++) {
      if (tanimoto(fingerprint[query_index], fingerprint[target_index]) >= threshold) {
        count++;
      }
    }
    /* The counts are too high by one since it includes the diagonal term */
    /* and tanimoto(fingerprint[i], fingerprint[i]) == 1.0 */
    /* Decrement by one to get the correct answer */
    counts[query_index] += count - 1;
  }
What this does is go row-by-row through the NxN comparison matrix, compute the similarities, and add up the number of times where the similarity is high enough. Since I include the diagonal term in the counts, and since the similarity along the diagonal is always 1.0, I have to subtract off one after computing the total row count.

Some might consider it inelegant that I count the self-similarity in (the main loop and subtract one at the end, but it makes the code short and understandable, and while there are N extra calculations, the double loop has a total of N*N calculations, so it's only a small amount of extra work.)

Parallelizing the NxN algorithm

Parallelizing this with OpenMP is dead-simple. I ask the compiler to evalute the row loop in parallel.

  fingerprint_t fingerprints[] = {array of fingerprint objects};
  int count, query_index, target_index;
  int counts[N] = {}; // initialize to 0
  assert(0.0 <= threshold && threshold <= 1.0);
  
  #pragma omp parallel for private(count, target_index)
  for (query_index=0; query_index<N; query_index++) {
    count = 0;
    for (target_index=0; target_index<N; target_index++) {
      if (tanimoto(fingerprint[query_index], fingerprint[target_index]) >= threshold) {
        count++;
      }
    }
    /* The counts are too high by one since it includes the diagonal term */
    /* and tanimoto(fingerprint[i], fingerprint[i]) == 1.0 */
    counts[query_index] += count - 1;
  }
That's it! OpenMP is a great fit to this case. With one new line of code, I have very good scaleup across many cores.

It's not perfect scaleup. For one, the tanimoto() calculation is fast; fast enough that memory bandwidth and cache performance is an issue. It might be faster to use Z-ordering or other cache-oblivious ordering. That's outside the scope of this essay. For that matter, I hadn't tested this hypothesis because I use another technique which usually gives good cache behavior for the situations I'm most concerned about.

What about symmetry?

That easy parallelization is great, right? Well, I'm missing out on a simple factor of two speedup. The tanimoto function is symmetric, so I only need to compute the upper triangle terms. Here's the single-threaded implementation:

  for (query_index=0; query_index<N; query_index++) {
    for (target_index=query_index+1; target_index<N; target_index++) {
      if (tanimoto(fingerprint[query_index], fingerprint[target_index]) >= threshold) {
        counts[query_index]++;
        counts[target_index]++;
      }
    }
  }
It looks simple. Too bad it doesn't parallelize well. Increment is not an atomic operation. If multiple threads execute "counts[i]++" at the same time, for the same value of i, then it might be that thread 1 reads a value of 4, thread 2 reads a value of 4, thread 1 writes the incremented value 5, and thread 2 writes its own incremented value of 5. This is bad.

One solution is to tell OpenMP that the increment code is a "critical" section, which means only one thread can execute it at a time. The resulting code is:

  #pragma omp parallel for private(target_index)
  for (query_index=0; query_index<N; query_index++) {
    for (target_index=query_index+1; target_index<N; target_index++) {
      if (tanimoto(fingerprint[query_index], fingerprint[target_index]) >= threshold) {
        #pragma omp critical (add_count)
        counts[query_index]++;
        #pragma omp critical (add_count)
        counts[target_index]++;
      }
    }
  }
Here, 'add_count' is the symbolic name for a global lock to a critical section.

I wrote something like this with a very high threshold, and found and almost perfect two-fold speedup. Go OpenMP!

Amdahl's Law strikes again!

The problem is the critical sections are single-threaded. When I lower the threshold, I find more matches, and more threads try to run the single threaded code. This runs directly into Amdahl's Law. The critical section becomes the limiting factor as all the threads contend for the same lock.

I can reduce the contention a bit by keeping track of the row counts in a thread-local variable:

  #pragma omp parallel for private(count, target_index)
  for (query_index=0; query_index<N; query_index++) {
    count = 0;
    for (target_index=query_index+1; target_index<N; target_index++) {
      if (tanimoto(fingerprint[query_index], fingerprint[target_index]) >= threshold) {
        count++;
        #pragma omp critical (add_count)
        counts[target_index]++;
      }
    }
    /* Correction on 2012-01-12; Commenter scott_s on Hacker News pointed out */
    /* that only one thread will ever get here with a given query_index. */
    /* There is no chance that multiple threads try to change the same value */
    /* This critical section can be removed without affecting correctness. */
    /* I leave this code here because it affects the timings. */
    /* It does not affect the conclusion that lock contention can make things slow */
    #pragma omp critical (add_count)
    counts[query_index] += count;
  }
This is the simplest seemingly-reasonable parallization of the upper-triangle algorithm.

How well does this work? My desktop has four cores. I can compare the performance of the original non-symmetric code with the symmetric one:

AlgorithmTanimoto thresholds
0.80.60.5
symmetric40s151s660s
non-symmetric82s170s207s
When the threshold is high (0.8), I get the expected factor-of-two performance boost. This means there is very little contention. The factor-of-two performance mostly dissapears for the medium high threshold of 0.6, and for the threshold of 0.5 the overall run-time is much slower than the original NxN algorithm. Indeed, I eventually gave up trying to determine run-time for even lower thresholds.

That's terrible. I have one algorithm which is best when there are few similarities, and another which is best when there are many similarities, and because the number of similarities is highly data-dependent, I don't have an easy way to figure out which algorithm to use.

Use many critical sections instead of one

The problem is that my four cores all want to use a single critical section. When one core has the lock, the other threads have to wait. What I can do is increase the number of critical sections. For example, I can have one lock to get access to the even-numbered rows, and another lock to get access to the odd-numbered rows. Here's the corresponding code:

  #pragma omp parallel for private(count, target_index)
  for (query_index=0; query_index<N; query_index++) {
    count = 0;
    for (target_index=query_index+1; target_index<N; target_index++) {
      if (tanimoto(fingerprint[query_index], fingerprint[target_index]) >= threshold) {
          count++;
        switch (target_index % 2) {
          case 0:
            #pragma omp critical (add_count0)
            counts[target_index]++;
            break;
          case 1:
            #pragma omp critical (add_count1)
            counts[target_index]++;
            break;
          }
      }
    }
    /* Correction on 2012-01-12; Commenter scott_s on Hacker News pointed out */
    /* that only one thread will ever get here with a given query_index. */
    /* There is no chance that multiple threads try to change the same value */
    /* The following could be replaced with a simple */
    /*     counts[query_index] += count;   */
    /* I leave this code here because it affects the timings. */
    /* Since this is only called O(N) (instead of O(N*N/2) times), it */
    /* should have minimal effect on the overall time. */
    switch (query_index % 2) {
      case 0:
        #pragma omp critical (add_count0)
        counts[query_index] += count;
        break;
      case 1:
        #pragma omp critical (add_count1)
        counts[query_index] += count;
        break;
      }
    }
  }
That's clumsy, but the performance is a bit better. With two critical sections the times are:
AlgorithmTanimoto thresholds
0.80.60.5
symmetric39s114s375s
non-symmetric82s170s207s
so symmetric code is faster, but there are cases where the non-symmetric code is faster still.

What about even more critical sections? I tried a range of values. Here's the table:

number of
critical sections
Tanimoto thresholds
0.80.60.50.40.20.01
140151660   
239114375over 37 minutes
164086133299  
644184105137271307
1284082102131244278
non-symmetric82170207240272280
The value of 0.01 makes the search algorithm calculate nearly all of the possible comparisons, so it's an effective worst-case for my search space. (I can't use 0.0 because my implementation has special support for that case; it knows that all fingerprints have N-1 neighbors.)

As you can see, with 128 critical sections and in the worst case, my code which takes advantage of symmetry is the same speed as the one which doesn't. This likely means that the code acquire and release the critical section lock has about the same amount of overhead at the Tanimoto similarity calculation.

I probably should have tried 256 different locks, but I think this code is ugly enough as it is, and very few people use thresholds below 0.6, much less down to 0.2. I do wonder how the times compare if there are, say, 16 cores, but this it's time to try a different solution.

What about per-thread count arrays?

There is an alternate solution. I could sum up the counts in individual, private/per-thread arrays and merge the final counts. Here's what the code looks like:

  /* Correction 2012-01-08: Originally I had omp_get_num_threads() here but */
  /* as acq points out on Hacker News, this only returns the number of active */
  /* threads while in a parallel section. Otherwise it returns 1. I fixed my */
  /* actual code during testing, but that fix didn't make its way over here. */
  /* I also tried using omp_get_max_threads() but that wasn't supported on my Mac. */
  int *parallel_counts = (int *) calloc(omp_get_max_threads() * N, sizeof(int));
  int *per_thread_counts;

  #pragma omp parallel for private(count, target_index, per_thread_counts)
  for (query_index=0; query_index<N; query_index++) {
    per_thread_counts = parallel_counts + (N * omp_get_thread_num() );
    count = 0;
    for (target_index=query_index+1; target_index<N; target_index++) {
      if (tanimoto(fingerprint[query_index], fingerprint[target_index]) >= threshold) {
        count++;
        per_thread_counts[target_index]++;
      }
    }
    per_thread_counts[query_index] += count;
 }
 for (query_index=0; query_index<N; query_index++) {
   count = 0;
   for (thread=0; thread<omp_get_num_threads(); thread++) {
     count += parallel_counts[query_index+N*thread];
   }
   counts[query_index] += count;
 }
 free(parallel_counts);
This requires no locking, and only a very small bit of sequential code which is linear in the number of fingerprints. There's more code, but this algorithm should scale better than the previous algorithm.

Here are the timings:

 Tanimoto thresholds
method0.80.60.50.40.20.01
128 critical sections4082102131244278
non-symmetric82170207240272280
per-thread counts4083100.116135137
There's the factor of two I was looking for!

And now using pthreads from Python

You might conclude this still shows a win for OpenMP. The problem is that the above is essentially identical to how I implement this algorithm using pthreads. I'm rather fond of Python's new concurrent.futures module, so I tested out a pthread-only driver to a single-threaded count function implemented in C.

import ctypes
import time
import itertools
from collections import defaultdict
import threading

import chemfp
from chemfp import futures
import _chemfp


def count_tanimoto_hits(all_counts, arena, threshold, row):
    thread_id = threading.current_thread().ident
    # This implements essentially:
    # for (i=row; i<row+1; i++) {
    #   for (j=row+1; j<N; j++) {
    #   if tanimoto(fingerprints[i], fingerprints[j]) >= threshold {
    #     counts[i]++;
    #     counts[j]++;
    #   }
    # }
    _chemfp.count_tanimoto_hits_arena_symmetric(
        threshold, arena.num_bits,
        arena.start_padding, arena.end_padding, arena.storage_size, arena.arena,
        row, row+1, row+1, len(arena),
        arena.popcount_indices, all_counts[thread_id])

def find_counts(arena, threshold, num_threads):
    # Allocate per-thread storage (based on the thread-id)
    def make_empty_counts():
        return (ctypes.c_int*len(arena))()
    all_counts = defaultdict(make_empty_counts)
    
    # Use a thread-pool with 4 worker threads
    with futures.ThreadPoolExecutor(max_workers=4) as executor:
        for row in xrange(len(arena)):
            executor.submit(count_tanimoto_hits, all_counts, arena, threshold, row)

    # Merge the private counts back into one total list of counts
    return [sum(cols) for cols in itertools.izip(*all_counts.values())]


arena = chemfp.load_fingerprints("zinc_drug_like.fps")

chemfp.set_num_threads(1) # Don't use multiple OpenMP threads

for threshold in (0.8, 0.6, 0.5, 0.4, 0.2, 0.01):
    t1 = time.time()
    x = find_counts(arena, threshold, 4)
    t2 = time.time()
    print threshold, t2-t1, "   ", sum(x)
The "chemfp.set_num_threads(1)" case bypasses the OpenMP-based code and tells "count_tanimoto_hits_arena_symmetric" to use the simple upper-right triangle implementation. As a result ...
 Tanimoto thresholds
method0.80.60.50.40.20.01
128 critical sections4082102131244278
non-symmetric82170207240272280
per-thread counts4083100.116135137
Python/pthreads4892112128145149
The pthread timings looks similar to those for OpenMP, except with a roughly 8 second (and near constant-time) overhead. This is likely the cost of starting N=110885 different jobs for the worker threads in Python. I confirmed by using a threshold of 0.95. The per-thread OpenMP algorithm takes 9.2 seconds while the pthread version takes 16.5 seconds, or about 7 seconds, as predicted.

While I did not test it out, I expect that a corresponding C/C++ implementation would have much less performance overhead. I just don't have the experience of using C pthread API, or an async I/O library for C/C++ like Grand Central Dispatch, or C++'s new promises to try to implement that code directly in C. It really is much easier to use OpenMP than to figure out those alternate solutions for C.

Possible bad benchmark comparisons

BTW, what I ended up doing in my Python code was to define a "band" of 100 rows, and let each thread process 100 rows at a time. This should cut the overhead down from 8 seconds to 0.08 seconds, making the pthread code about comparable to the OpenMP code. I didn't test it out though, because my actual code uses a more sophisticated algorithm which also have the effect of improving cache coherency, and there's evidence that banding makes the coherency worse and causes slowdowns while waiting for memory fetches.

Unfortunately, it also looks like the analysis I did the other day had a flaw which causes the pthread benchmark to have bad memory access behavior. In short, the pthreads were randomly assigned bands to process, while the OpenMP version also gets randomly assigned bands, but all of the cores work on tasks in the same band. Hence, better coherency. (It looks like the pthread performance for one test case goes from 48 seconds with randomly assigned bands to 40 seconds with sequentially assigned bands.)

I consider this a win for OpenMP. I did random assignments so I could display a progress bar. Assymmetries in the data mean that the first few bands and the last few bands are much easier to process than ones in the middle. With random band assignment, I get a much better estimate of the time to completion. Using OpenMP gives me that estimate without a noticable performance hit. With pthreads, it's much hard to get both performance and a estimate.

Conclusion

Effective parallelization with good scaleup is hard, no matter which technique you use. There are a lot of subtle issues. You need to understand how the technique works and measure your results to make sure you really do understand the problem.

My experience is that OpenMP is an effective technique to help you with your task. In a few cases, a trivially simple addition of a line of code gives instant speedup. It's more likely though that you've got some work ahead of you to make this happen.

If you're already using pthreads, Grand Central Dispatch, or some other multithreading or asyncronous I/O system, then I don't think that OpenMP adds much new. Instead, I think its biggest advantage is that you can make changes to existing code without introducing a new library API, without having to set up your own event loop/reactor, and with compiler-based thread control and syncronization primitives which make a large number of potential bugs of hand-built systems disappear.


Andrew Dalke is an independent consultant focusing on software development for computational chemistry and biology. Need contract programming, help, or training? Contact me



Copyright © 2001-2013 Andrew Dalke Scientific AB