User Tools

Site Tools


parallelprefix

Parallel Prefix

An implementation of the parallel prefix sum problem using Java, Threads and Barriers.

ParallelPrefix.java

package ch.ethz.pp;
 
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
 
public class ParallelPrefix {
 
    static int perThread = 10;
    static int threads = 16;
 
    public static void main(String[] args) {
        ParallelPrefix pp = new ParallelPrefix();
        pp.start2();
    }
 
    int length = threads * perThread;
 
    int[] values;
 
    public ParallelPrefix() {
        values = new int[length];
        Random random = new Random();
        for (int i = 0; i < length; i++) {
            values[i] = random.nextInt() % 100;
        }
    }
 
    public void start() {
        int[] prefix = new int[length];
        int chunk = length / threads;
        Thread[] thread = new Thread[threads];
        SumWorker[] workers = new SumWorker[threads];
        CyclicBarrier barrier = new CyclicBarrier(threads + 1);
        for (int i = 0; i < threads; i++) {
            workers[i] = new SumWorker(values, i * chunk, (i + 1) * chunk,
                    prefix, barrier);
            thread[i] = new Thread(workers[i]);
            thread[i].start();
        }
        try {
            barrier.await();
            int sum = 0;
            for (int i = 1; i < threads; i++) {
                sum = workers[i - 1].getSum() + sum;
                workers[i].setMax(sum);
            }
            System.out.println(Arrays.toString(prefix));
            barrier.await();
            for (Thread t : thread) {
                t.join();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
        System.out.println(Arrays.toString(prefix));
        System.out.println(prefix[length - 1]);
    }
}

SumWorker.java

package ch.ethz.pp;
 
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
 
public class SumWorker implements Runnable {
 
    private final CyclicBarrier barrier;
 
    private final int[] pref;
    protected final int end;
 
    protected int max;
    protected final int start;
 
    protected int sum = 0;
 
    protected final int[] values;
 
    public SumWorker(int[] values, int start, int end, int[] prefix,
            CyclicBarrier barrier) {
        this.values = values;
        this.start = start;
        this.end = end;
        pref = prefix;
        this.barrier = barrier;
 
    }
 
    public int computeSum() throws InterruptedException {
        int s = 0;
        for (int i = start; i < end; i++) {
            s = s + values[i];
        }
        sum = s;
        return sum;
    }
 
    public int getSum() {
        return sum;
    }
 
    public void prefixSum(int parent) throws InterruptedException {
        int pre = max;
        for (int i = start; i < end; i++) {
            pre = pre + values[i];
            pref[i] = pre;
        }
    }
 
    public void run() {
        try {
            computeSum();
            barrier.await();
            // distribute max values
            barrier.await();
            prefixSum(max);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
 
    public void setMax(int max) {
        this.max = max;
    }
}
parallelprefix.txt · Last modified: 2009/02/26 17:27 by moritz