====== Parallel Prefix ====== An implementation of the parallel prefix sum problem using Java, Threads and Barriers. ParallelPrefix.java package ch.ethz.pp; import java.util.Arrays; import java.util.Random; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class ParallelPrefix { static int perThread = 10; static int threads = 16; public static void main(String[] args) { ParallelPrefix pp = new ParallelPrefix(); pp.start2(); } int length = threads * perThread; int[] values; public ParallelPrefix() { values = new int[length]; Random random = new Random(); for (int i = 0; i < length; i++) { values[i] = random.nextInt() % 100; } } public void start() { int[] prefix = new int[length]; int chunk = length / threads; Thread[] thread = new Thread[threads]; SumWorker[] workers = new SumWorker[threads]; CyclicBarrier barrier = new CyclicBarrier(threads + 1); for (int i = 0; i < threads; i++) { workers[i] = new SumWorker(values, i * chunk, (i + 1) * chunk, prefix, barrier); thread[i] = new Thread(workers[i]); thread[i].start(); } try { barrier.await(); int sum = 0; for (int i = 1; i < threads; i++) { sum = workers[i - 1].getSum() + sum; workers[i].setMax(sum); } System.out.println(Arrays.toString(prefix)); barrier.await(); for (Thread t : thread) { t.join(); } } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println(Arrays.toString(prefix)); System.out.println(prefix[length - 1]); } } SumWorker.java package ch.ethz.pp; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class SumWorker implements Runnable { private final CyclicBarrier barrier; private final int[] pref; protected final int end; protected int max; protected final int start; protected int sum = 0; protected final int[] values; public SumWorker(int[] values, int start, int end, int[] prefix, CyclicBarrier barrier) { this.values = values; this.start = start; this.end = end; pref = prefix; this.barrier = barrier; } public int computeSum() throws InterruptedException { int s = 0; for (int i = start; i < end; i++) { s = s + values[i]; } sum = s; return sum; } public int getSum() { return sum; } public void prefixSum(int parent) throws InterruptedException { int pre = max; for (int i = start; i < end; i++) { pre = pre + values[i]; pref[i] = pre; } } public void run() { try { computeSum(); barrier.await(); // distribute max values barrier.await(); prefixSum(max); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (BrokenBarrierException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void setMax(int max) { this.max = max; } }