====== Parallel Prefix ======
An implementation of the parallel prefix sum problem using Java, Threads and Barriers.
ParallelPrefix.java
package ch.ethz.pp;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class ParallelPrefix {
static int perThread = 10;
static int threads = 16;
public static void main(String[] args) {
ParallelPrefix pp = new ParallelPrefix();
pp.start2();
}
int length = threads * perThread;
int[] values;
public ParallelPrefix() {
values = new int[length];
Random random = new Random();
for (int i = 0; i < length; i++) {
values[i] = random.nextInt() % 100;
}
}
public void start() {
int[] prefix = new int[length];
int chunk = length / threads;
Thread[] thread = new Thread[threads];
SumWorker[] workers = new SumWorker[threads];
CyclicBarrier barrier = new CyclicBarrier(threads + 1);
for (int i = 0; i < threads; i++) {
workers[i] = new SumWorker(values, i * chunk, (i + 1) * chunk,
prefix, barrier);
thread[i] = new Thread(workers[i]);
thread[i].start();
}
try {
barrier.await();
int sum = 0;
for (int i = 1; i < threads; i++) {
sum = workers[i - 1].getSum() + sum;
workers[i].setMax(sum);
}
System.out.println(Arrays.toString(prefix));
barrier.await();
for (Thread t : thread) {
t.join();
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(Arrays.toString(prefix));
System.out.println(prefix[length - 1]);
}
}
SumWorker.java
package ch.ethz.pp;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class SumWorker implements Runnable {
private final CyclicBarrier barrier;
private final int[] pref;
protected final int end;
protected int max;
protected final int start;
protected int sum = 0;
protected final int[] values;
public SumWorker(int[] values, int start, int end, int[] prefix,
CyclicBarrier barrier) {
this.values = values;
this.start = start;
this.end = end;
pref = prefix;
this.barrier = barrier;
}
public int computeSum() throws InterruptedException {
int s = 0;
for (int i = start; i < end; i++) {
s = s + values[i];
}
sum = s;
return sum;
}
public int getSum() {
return sum;
}
public void prefixSum(int parent) throws InterruptedException {
int pre = max;
for (int i = start; i < end; i++) {
pre = pre + values[i];
pref[i] = pre;
}
}
public void run() {
try {
computeSum();
barrier.await();
// distribute max values
barrier.await();
prefixSum(max);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void setMax(int max) {
this.max = max;
}
}