Concurrency: New Library Components (1)

Java SE5中加入的java.util.concurrent包含了很多用于解决并发问题的新类。

1. CountDownLatch

  创建CountDownLatch时需要指定一个初始的计数,在任务中调用CountDownLatch对象的await()会阻塞任务,直到CountDownLatch的计数为零。其他任务可以(在自己的工作完成后)调用CountDownLatch对象的countDown()来缩减计数。由此可以对一个或多个任务进行同步,使它们能够等待其他任务完成某项工作后,再继续执行。CountDownLatch是一次性的,不能重置计数。

  CountDownLatch的一个典型用法是把一个问题分成n个可以单独解决的子任务,初始化CountDownLatch的计数为n,子任务完成后调用countDown(),需要在当前问题解决后再执行的任务通过await()等待所有子任务完成后再执行。

import java.util.concurrent.*;
import java.util.*;
// Performs some portion of a task:
class TaskPortion implements Runnable {
private static int counter = 0;
private final int id = counter++;
private static Random rand = new Random(47);
private final CountDownLatch latch;
TaskPortion(CountDownLatch latch) {
this.latch = latch;
}
public void run() {
try {
doWork();
latch.countDown();
} catch(InterruptedException ex) {
// Acceptable way to exit
}
}
public void doWork() throws InterruptedException {
TimeUnit.MILLISECONDS.sleep(rand.nextInt(2000));
System.out.println(this + "completed");
}
public String toString() {
return String.format("%1$-3d ", id);
}
}
// Waits on the CountDownLatch:
class WaitingTask implements Runnable {
private static int counter = 0;
private final int id = counter++;
private final CountDownLatch latch;
WaitingTask(CountDownLatch latch) {
this.latch = latch;
}
public void run() {
try {
latch.await();
System.out.println("Latch barrier passed for " + this);
} catch(InterruptedException ex) {
System.out.println(this + " interrupted");
}
}
public String toString() {
return String.format("WaitingTask %1$-3d ", id);
}
}
public class CountDownLatchDemo {
static final int SIZE = 100;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
// All must share a single CountDownLatch object:
CountDownLatch latch = new CountDownLatch(SIZE);
for(int i = 0; i < 10; i++)
exec.execute(new WaitingTask(latch));
for(int i = 0; i < SIZE; i++)
exec.execute(new TaskPortion(latch));
System.out.println("Launched all tasks");
exec.shutdown(); // Quit when all tasks complete
}
}
import java.util.concurrent.*; import java.util.*; // Performs some portion of a task: class TaskPortion implements Runnable { private static int counter = 0; private final int id = counter++; private static Random rand = new Random(47); private final CountDownLatch latch; TaskPortion(CountDownLatch latch) { this.latch = latch; } public void run() { try { doWork(); latch.countDown(); } catch(InterruptedException ex) { // Acceptable way to exit } } public void doWork() throws InterruptedException { TimeUnit.MILLISECONDS.sleep(rand.nextInt(2000)); System.out.println(this + "completed"); } public String toString() { return String.format("%1$-3d ", id); } } // Waits on the CountDownLatch: class WaitingTask implements Runnable { private static int counter = 0; private final int id = counter++; private final CountDownLatch latch; WaitingTask(CountDownLatch latch) { this.latch = latch; } public void run() { try { latch.await(); System.out.println("Latch barrier passed for " + this); } catch(InterruptedException ex) { System.out.println(this + " interrupted"); } } public String toString() { return String.format("WaitingTask %1$-3d ", id); } } public class CountDownLatchDemo { static final int SIZE = 100; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); // All must share a single CountDownLatch object: CountDownLatch latch = new CountDownLatch(SIZE); for(int i = 0; i < 10; i++) exec.execute(new WaitingTask(latch)); for(int i = 0; i < SIZE; i++) exec.execute(new TaskPortion(latch)); System.out.println("Launched all tasks"); exec.shutdown(); // Quit when all tasks complete } }
import java.util.concurrent.*;
import java.util.*;

// Performs some portion of a task:
class TaskPortion implements Runnable {
    private static int counter = 0;
    private final int id = counter++;
    private static Random rand = new Random(47);
    private final CountDownLatch latch;
    TaskPortion(CountDownLatch latch) {
        this.latch = latch;
    }
    public void run() {
        try {
            doWork();
            latch.countDown();
        } catch(InterruptedException ex) {
            // Acceptable way to exit
        }
    }
    public void doWork() throws InterruptedException {
        TimeUnit.MILLISECONDS.sleep(rand.nextInt(2000));
        System.out.println(this + "completed");
    }
    public String toString() {
        return String.format("%1$-3d ", id);
    }
}
// Waits on the CountDownLatch:
class WaitingTask implements Runnable {
    private static int counter = 0;
    private final int id = counter++;
    private final CountDownLatch latch;
    WaitingTask(CountDownLatch latch) {
        this.latch = latch;
    }
    public void run() {
        try {
            latch.await();
            System.out.println("Latch barrier passed for " + this);
        } catch(InterruptedException ex) {
            System.out.println(this + " interrupted");
        }
    }
    public String toString() {
        return String.format("WaitingTask %1$-3d ", id);
    }
}
public class CountDownLatchDemo {
    static final int SIZE = 100;
    public static void main(String[] args) throws Exception {
        ExecutorService exec = Executors.newCachedThreadPool();
        // All must share a single CountDownLatch object:
        CountDownLatch latch = new CountDownLatch(SIZE);
        for(int i = 0; i < 10; i++)
            exec.execute(new WaitingTask(latch));
        for(int i = 0; i < SIZE; i++)
            exec.execute(new TaskPortion(latch));
        System.out.println("Launched all tasks");
        exec.shutdown(); // Quit when all tasks complete
    }
}

这里把当前需要解决的问题分成SIZE份,由SIZETaskPortion执行,当所有TaskPortion执行完后,等待中的WaitingTask再开始执行。所有TaskPortionWaitingTask都使用同一个CountDownLatch latch

Library thread safety

  注意在上面的例子中,所有TaskPortion共享了一个static Random rand,由于Random.nextInt()是线程安全的,所以这样的用法没有问题。需要注意类似场合下所使用的共享资源是否是线程安全的,如果不是,可以把static去掉,使每个任务都有自己的实例。

2. CyclicBarrier

  CyclicBarrier可以让任务在完成自己工作后,等待其他任务完成,当所有任务都完成后,再唤醒所有等待中的任务继续运行。CyclicBarrier类似于CountDownLatch,二者的一个区别在于,CountDownLatch是一次性的,而CyclicBarrier可以反复使用。

import java.util.concurrent.*;
import java.util.*;
class Horse implements Runnable {
private static int counter = 0;
private final int id = counter++;
private int strides = 0;
private static Random rand = new Random(47);
private static CyclicBarrier barrier;
public Horse(CyclicBarrier b) { barrier = b; }
public synchronized int getStrides() { return strides; }
public void run() {
try {
while(!Thread.interrupted()) {
synchronized(this) {
strides += rand.nextInt(3); // Produces 0, 1 or 2
}
barrier.await();
}
} catch(InterruptedException e) {
// A legitimate way to exit
} catch(BrokenBarrierException e) {
// This one we want to know about
throw new RuntimeException(e);
}
}
public String toString() { return "Horse " + id + " "; }
public String tracks() {
StringBuilder s = new StringBuilder();
for(int i = 0; i < getStrides(); i++)
s.append("*");
s.append(id);
return s.toString();
}
}
public class HorseRace {
static final int FINISH_LINE = 75;
private List<Horse> horses = new ArrayList<Horse>();
private ExecutorService exec =
Executors.newCachedThreadPool();
private CyclicBarrier barrier;
public HorseRace(int nHorses, final int pause) {
barrier = new CyclicBarrier(nHorses, new Runnable() {
public void run() {
StringBuilder s = new StringBuilder();
for(int i = 0; i < FINISH_LINE; i++)
s.append("="); // The fence on the racetrack
System.out.println(s);
for(Horse horse : horses)
System.out.println(horse.tracks());
for(Horse horse : horses)
if(horse.getStrides() >= FINISH_LINE) {
System.out.println(horse + "won!");
exec.shutdownNow();
return;
}
try {
TimeUnit.MILLISECONDS.sleep(pause);
} catch(InterruptedException e) {
System.out.println("barrier-action sleep interrupted");
}
}
});
for(int i = 0; i < nHorses; i++) {
Horse horse = new Horse(barrier);
horses.add(horse);
exec.execute(horse);
}
}
public static void main(String[] args) {
int nHorses = 7;
int pause = 200;
if(args.length > 0) { // Optional argument
int n = new Integer(args[0]);
nHorses = n > 0 ? n : nHorses;
}
if(args.length > 1) { // Optional argument
int p = new Integer(args[1]);
pause = p > -1 ? p : pause;
}
new HorseRace(nHorses, pause);
}
}
import java.util.concurrent.*; import java.util.*; class Horse implements Runnable { private static int counter = 0; private final int id = counter++; private int strides = 0; private static Random rand = new Random(47); private static CyclicBarrier barrier; public Horse(CyclicBarrier b) { barrier = b; } public synchronized int getStrides() { return strides; } public void run() { try { while(!Thread.interrupted()) { synchronized(this) { strides += rand.nextInt(3); // Produces 0, 1 or 2 } barrier.await(); } } catch(InterruptedException e) { // A legitimate way to exit } catch(BrokenBarrierException e) { // This one we want to know about throw new RuntimeException(e); } } public String toString() { return "Horse " + id + " "; } public String tracks() { StringBuilder s = new StringBuilder(); for(int i = 0; i < getStrides(); i++) s.append("*"); s.append(id); return s.toString(); } } public class HorseRace { static final int FINISH_LINE = 75; private List<Horse> horses = new ArrayList<Horse>(); private ExecutorService exec = Executors.newCachedThreadPool(); private CyclicBarrier barrier; public HorseRace(int nHorses, final int pause) { barrier = new CyclicBarrier(nHorses, new Runnable() { public void run() { StringBuilder s = new StringBuilder(); for(int i = 0; i < FINISH_LINE; i++) s.append("="); // The fence on the racetrack System.out.println(s); for(Horse horse : horses) System.out.println(horse.tracks()); for(Horse horse : horses) if(horse.getStrides() >= FINISH_LINE) { System.out.println(horse + "won!"); exec.shutdownNow(); return; } try { TimeUnit.MILLISECONDS.sleep(pause); } catch(InterruptedException e) { System.out.println("barrier-action sleep interrupted"); } } }); for(int i = 0; i < nHorses; i++) { Horse horse = new Horse(barrier); horses.add(horse); exec.execute(horse); } } public static void main(String[] args) { int nHorses = 7; int pause = 200; if(args.length > 0) { // Optional argument int n = new Integer(args[0]); nHorses = n > 0 ? n : nHorses; } if(args.length > 1) { // Optional argument int p = new Integer(args[1]); pause = p > -1 ? p : pause; } new HorseRace(nHorses, pause); } }
import java.util.concurrent.*;
import java.util.*;

class Horse implements Runnable {
    private static int counter = 0;
    private final int id = counter++;
    private int strides = 0;
    private static Random rand = new Random(47);
    private static CyclicBarrier barrier;
    public Horse(CyclicBarrier b) { barrier = b; }
    public synchronized int getStrides() { return strides; }
    public void run() {
        try {
            while(!Thread.interrupted()) {
                synchronized(this) {
                    strides += rand.nextInt(3); // Produces 0, 1 or 2
                }
                barrier.await();
            }
        } catch(InterruptedException e) {
            // A legitimate way to exit
        } catch(BrokenBarrierException e) {
            // This one we want to know about
            throw new RuntimeException(e);
        }
    }
    public String toString() { return "Horse " + id + " "; }
    public String tracks() {
        StringBuilder s = new StringBuilder();
        for(int i = 0; i < getStrides(); i++)
            s.append("*");
        s.append(id);
        return s.toString();
    }
}
public class HorseRace {
    static final int FINISH_LINE = 75;
    private List<Horse> horses = new ArrayList<Horse>();
    private ExecutorService exec =
            Executors.newCachedThreadPool();
    private CyclicBarrier barrier;
    public HorseRace(int nHorses, final int pause) {
        barrier = new CyclicBarrier(nHorses, new Runnable() {
            public void run() {
                StringBuilder s = new StringBuilder();
                for(int i = 0; i < FINISH_LINE; i++)
                    s.append("="); // The fence on the racetrack
                System.out.println(s);
                for(Horse horse : horses)
                    System.out.println(horse.tracks());
                for(Horse horse : horses)
                    if(horse.getStrides() >= FINISH_LINE) {
                        System.out.println(horse + "won!");
                        exec.shutdownNow();
                        return;
                    }
                try {
                    TimeUnit.MILLISECONDS.sleep(pause);
                } catch(InterruptedException e) {
                    System.out.println("barrier-action sleep interrupted");
                }
            }
        });
        for(int i = 0; i < nHorses; i++) {
            Horse horse = new Horse(barrier);
            horses.add(horse);
            exec.execute(horse);
        }
    }
    public static void main(String[] args) {
        int nHorses = 7;
        int pause = 200;
        if(args.length > 0) { // Optional argument
            int n = new Integer(args[0]);
            nHorses = n > 0 ? n : nHorses;
        }
        if(args.length > 1) { // Optional argument
            int p = new Integer(args[1]);
            pause = p > -1 ? p : pause;
        }
        new HorseRace(nHorses, pause);
    }
}

上面是一个赛马的模拟程序,Horse每次随机地前进0~2步,若干个Horse同时运行,先达到终点(指定步数)的获胜。CyclicBarrier用于协调各个Horse的运行,在Horse前进过后(strides += rand.nextInt(3))后,调用barrier.await()使任务进入等待,当所有Horse都前进过后,再唤醒任务,开始下一轮循环。CyclicBarrierHorseRace()中通过barrier = new CyclicBarrier(nHorses, new Runnable(){...})创建,第一个参数nHorses指定了初始计数,第二个参数指定了一个Runnable,该Runnable会在计数为零后运行,用于打印和判断是否出现赢家。

3. DelayQueue

  DelayQueue是一个容纳实现了Delayed接口的元素的阻塞队列,队列中的元素只能在其延时到期后取出,DelayQueue按照延时到期时间对其中的元素进行排序,队首的元素的延时最先到期。如果没有延时到期的元素,只能从队列中取出null(所以不能把null放到DelayQueue中)。

  Delayed接口只有一个getDelay()方法,指示延时的时间,调用者通过TimeUnit参数指示自己想要的单位。Delayed接口继承了Comparable,所以还需要实现compareTo()方法,用于按照延时对任务进行排序。

import java.util.concurrent.*;
import java.util.*;
import static java.util.concurrent.TimeUnit.*;
class DelayedTask implements Runnable, Delayed {
private static int counter = 0;
private final int id = counter++;
private final int delta;
private final long trigger;
protected static List<DelayedTask> sequence =
new ArrayList<DelayedTask>();
public DelayedTask(int delayInMilliseconds) {
delta = delayInMilliseconds;
trigger = System.nanoTime() + NANOSECONDS.convert(delta, MILLISECONDS);
sequence.add(this);
}
public long getDelay(TimeUnit unit) {
return unit.convert(trigger - System.nanoTime(), NANOSECONDS);
}
public int compareTo(Delayed arg) {
DelayedTask that = (DelayedTask)arg;
if(trigger < that.trigger) return -1;
if(trigger > that.trigger) return 1;
return 0;
}
public void run() { System.out.print(this + " "); }
public String toString() {
return String.format("[%1$-4d]", delta) + " Task " + id;
}
public String summary() {
return "(" + id + ":" + delta + ")";
}
public static class EndSentinel extends DelayedTask {
private ExecutorService exec;
public EndSentinel(int delay, ExecutorService e) {
super(delay);
exec = e;
}
public void run() {
for(DelayedTask pt : sequence) {
System.out.print(pt.summary() + " ");
}
System.out.println();
System.out.println(this + " Calling shutdownNow()");
exec.shutdownNow();
}
}
}
class DelayedTaskConsumer implements Runnable {
private DelayQueue<DelayedTask> q;
public DelayedTaskConsumer(DelayQueue<DelayedTask> q) {
this.q = q;
}
public void run() {
try {
while(!Thread.interrupted())
q.take().run(); // Run task with the current thread
} catch(InterruptedException e) {
// Acceptable way to exit
}
System.out.println("Finished DelayedTaskConsumer");
}
}
public class DelayQueueDemo {
public static void main(String[] args) {
Random rand = new Random(47);
ExecutorService exec = Executors.newCachedThreadPool();
DelayQueue<DelayedTask> queue = new DelayQueue<DelayedTask>();
// Fill with tasks that have random delays:
for(int i = 0; i < 20; i++)
queue.put(new DelayedTask(rand.nextInt(5000)));
// Set the stopping point
queue.add(new DelayedTask.EndSentinel(5000, exec));
exec.execute(new DelayedTaskConsumer(queue));
}
}
/* Output
[128 ] Task 11 [200 ] Task 7 [429 ] Task 5 [520 ] Task 18 [555 ] Task 1 [961 ] Task 4
[998 ] Task 16 [1207] Task 9 [1693] Task 2 [1809] Task 14 [1861] Task 3 [2278] Task 15
[3288] Task 10 [3551] Task 12 [4258] Task 0 [4258] Task 19 [4522] Task 8 [4589] Task 13
[4861] Task 17 [4868] Task 6 (0:4258) (1:555) (2:1693) (3:1861) (4:961) (5:429) (6:4868)
(7:200) (8:4522) (9:1207) (10:3288) (11:128) (12:3551) (13:4589) (14:1809) (15:2278)
(16:998) (17:4861) (18:520) (19:4258) (20:5000)
[5000] Task 20 Calling shutdownNow()
Finished DelayedTaskConsumer
*/
import java.util.concurrent.*; import java.util.*; import static java.util.concurrent.TimeUnit.*; class DelayedTask implements Runnable, Delayed { private static int counter = 0; private final int id = counter++; private final int delta; private final long trigger; protected static List<DelayedTask> sequence = new ArrayList<DelayedTask>(); public DelayedTask(int delayInMilliseconds) { delta = delayInMilliseconds; trigger = System.nanoTime() + NANOSECONDS.convert(delta, MILLISECONDS); sequence.add(this); } public long getDelay(TimeUnit unit) { return unit.convert(trigger - System.nanoTime(), NANOSECONDS); } public int compareTo(Delayed arg) { DelayedTask that = (DelayedTask)arg; if(trigger < that.trigger) return -1; if(trigger > that.trigger) return 1; return 0; } public void run() { System.out.print(this + " "); } public String toString() { return String.format("[%1$-4d]", delta) + " Task " + id; } public String summary() { return "(" + id + ":" + delta + ")"; } public static class EndSentinel extends DelayedTask { private ExecutorService exec; public EndSentinel(int delay, ExecutorService e) { super(delay); exec = e; } public void run() { for(DelayedTask pt : sequence) { System.out.print(pt.summary() + " "); } System.out.println(); System.out.println(this + " Calling shutdownNow()"); exec.shutdownNow(); } } } class DelayedTaskConsumer implements Runnable { private DelayQueue<DelayedTask> q; public DelayedTaskConsumer(DelayQueue<DelayedTask> q) { this.q = q; } public void run() { try { while(!Thread.interrupted()) q.take().run(); // Run task with the current thread } catch(InterruptedException e) { // Acceptable way to exit } System.out.println("Finished DelayedTaskConsumer"); } } public class DelayQueueDemo { public static void main(String[] args) { Random rand = new Random(47); ExecutorService exec = Executors.newCachedThreadPool(); DelayQueue<DelayedTask> queue = new DelayQueue<DelayedTask>(); // Fill with tasks that have random delays: for(int i = 0; i < 20; i++) queue.put(new DelayedTask(rand.nextInt(5000))); // Set the stopping point queue.add(new DelayedTask.EndSentinel(5000, exec)); exec.execute(new DelayedTaskConsumer(queue)); } } /* Output [128 ] Task 11 [200 ] Task 7 [429 ] Task 5 [520 ] Task 18 [555 ] Task 1 [961 ] Task 4 [998 ] Task 16 [1207] Task 9 [1693] Task 2 [1809] Task 14 [1861] Task 3 [2278] Task 15 [3288] Task 10 [3551] Task 12 [4258] Task 0 [4258] Task 19 [4522] Task 8 [4589] Task 13 [4861] Task 17 [4868] Task 6 (0:4258) (1:555) (2:1693) (3:1861) (4:961) (5:429) (6:4868) (7:200) (8:4522) (9:1207) (10:3288) (11:128) (12:3551) (13:4589) (14:1809) (15:2278) (16:998) (17:4861) (18:520) (19:4258) (20:5000) [5000] Task 20 Calling shutdownNow() Finished DelayedTaskConsumer */
import java.util.concurrent.*;
import java.util.*;
import static java.util.concurrent.TimeUnit.*;

class DelayedTask implements Runnable, Delayed {
    private static int counter = 0;
    private final int id = counter++;
    private final int delta;
    private final long trigger;
    protected static List<DelayedTask> sequence =
            new ArrayList<DelayedTask>();
    public DelayedTask(int delayInMilliseconds) {
        delta = delayInMilliseconds;
        trigger = System.nanoTime() + NANOSECONDS.convert(delta, MILLISECONDS);
        sequence.add(this);
    }
    public long getDelay(TimeUnit unit) {
        return unit.convert(trigger - System.nanoTime(), NANOSECONDS);
    }
    public int compareTo(Delayed arg) {
        DelayedTask that = (DelayedTask)arg;
        if(trigger < that.trigger) return -1;
        if(trigger > that.trigger) return 1;
        return 0;
    }
    public void run() { System.out.print(this + " "); }
    public String toString() {
        return String.format("[%1$-4d]", delta) + " Task " + id;
    }
    public String summary() {
        return "(" + id + ":" + delta + ")";
    }
    public static class EndSentinel extends DelayedTask {
        private ExecutorService exec;
        public EndSentinel(int delay, ExecutorService e) {
            super(delay);
            exec = e;
        }
        public void run() {
            for(DelayedTask pt : sequence) {
                System.out.print(pt.summary() + " ");
            }
            System.out.println();
            System.out.println(this + " Calling shutdownNow()");
            exec.shutdownNow();
        }
    }
}
class DelayedTaskConsumer implements Runnable {
    private DelayQueue<DelayedTask> q;
    public DelayedTaskConsumer(DelayQueue<DelayedTask> q) {
        this.q = q;
    }
    public void run() {
        try {
            while(!Thread.interrupted())
                q.take().run(); // Run task with the current thread
        } catch(InterruptedException e) {
            // Acceptable way to exit
        }
        System.out.println("Finished DelayedTaskConsumer");
    }
}
public class DelayQueueDemo {
    public static void main(String[] args) {
        Random rand = new Random(47);
        ExecutorService exec = Executors.newCachedThreadPool();
        DelayQueue<DelayedTask> queue = new DelayQueue<DelayedTask>();
        // Fill with tasks that have random delays:
        for(int i = 0; i < 20; i++)
            queue.put(new DelayedTask(rand.nextInt(5000)));
        // Set the stopping point
        queue.add(new DelayedTask.EndSentinel(5000, exec));
        exec.execute(new DelayedTaskConsumer(queue));
    }
}
/* Output
[128 ] Task 11 [200 ] Task 7 [429 ] Task 5 [520 ] Task 18 [555 ] Task 1 [961 ] Task 4 
[998 ] Task 16 [1207] Task 9 [1693] Task 2 [1809] Task 14 [1861] Task 3 [2278] Task 15 
[3288] Task 10 [3551] Task 12 [4258] Task 0 [4258] Task 19 [4522] Task 8 [4589] Task 13 
[4861] Task 17 [4868] Task 6 (0:4258) (1:555) (2:1693) (3:1861) (4:961) (5:429) (6:4868) 
(7:200) (8:4522) (9:1207) (10:3288) (11:128) (12:3551) (13:4589) (14:1809) (15:2278) 
(16:998) (17:4861) (18:520) (19:4258) (20:5000) 
[5000] Task 20 Calling shutdownNow()
Finished DelayedTaskConsumer
*/

这里为20个任务随机配置了延时,由打印可见,延时先到期的任务会先被执行。EndSentinel具有最大延时,位于队列末尾,用于结束所有任务。

4. PriorityBlockingQueue

  PriorityBlockingQueue顾名思义,就是一个带阻塞功能的优先队列。

import java.util.concurrent.*;
import java.util.*;
class PrioritizedTask implements Runnable, Comparable<PrioritizedTask> {
private Random rand = new Random(47);
private static int counter = 0;
private final int id = counter++;
private final int priority;
protected static List<PrioritizedTask> sequence = new ArrayList<PrioritizedTask>();
public PrioritizedTask(int priority) {
this.priority = priority;
sequence.add(this);
}
public int compareTo(PrioritizedTask arg) {
return priority < arg.priority ? 1 :
(priority > arg.priority ? -1 : 0);
}
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(rand.nextInt(250));
} catch(InterruptedException e) {
// Acceptable way to exit
}
System.out.println(this);
}
public String toString() {
return String.format("[%1$-3d]", priority) +
" Task " + id;
}
public String summary() {
return "(" + id + ":" + priority + ")";
}
public static class EndSentinel extends PrioritizedTask {
private ExecutorService exec;
public EndSentinel(ExecutorService e) {
super(-1); // Lowest priority in this program
exec = e;
}
public void run() {
int count = 0;
for(PrioritizedTask pt : sequence) {
System.out.print(pt.summary());
if(++count % 5 == 0)
System.out.println();
}
System.out.println();
System.out.println(this + " Calling shutdownNow()");
exec.shutdownNow();
}
}
}
class PrioritizedTaskProducer implements Runnable {
private Random rand = new Random(47);
private Queue<Runnable> queue;
private ExecutorService exec;
public PrioritizedTaskProducer(Queue<Runnable> q, ExecutorService e) {
queue = q;
exec = e; // Used for EndSentinel
}
public void run() {
// Unbounded queue; never blocks.
// Fill it up fast with random priorities:
for(int i = 0; i < 20; i++) {
queue.add(new PrioritizedTask(rand.nextInt(10)));
Thread.yield();
}
// Trickle in highest-priority jobs:
try {
for(int i = 0; i < 10; i++) {
TimeUnit.MILLISECONDS.sleep(250);
queue.add(new PrioritizedTask(10));
}
// Add jobs, lowest priority first:
for(int i = 0; i < 10; i++)
queue.add(new PrioritizedTask(i));
// A sentinel to stop all the tasks:
queue.add(new PrioritizedTask.EndSentinel(exec));
} catch(InterruptedException e) {
// Acceptable way to exit
}
System.out.println("Finished PrioritizedTaskProducer");
}
}
class PrioritizedTaskConsumer implements Runnable {
private PriorityBlockingQueue<Runnable> q;
public PrioritizedTaskConsumer(PriorityBlockingQueue<Runnable> q) {
this.q = q;
}
public void run() {
try {
while(!Thread.interrupted())
// Use current thread to run the task:
q.take().run();
} catch(InterruptedException e) {
// Acceptable way to exit
}
System.out.println("Finished PrioritizedTaskConsumer");
}
}
public class PriorityBlockingQueueDemo {
public static void main(String[] args) throws Exception {
Random rand = new Random(47);
ExecutorService exec = Executors.newCachedThreadPool();
PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<Runnable>();
exec.execute(new PrioritizedTaskProducer(queue, exec));
exec.execute(new PrioritizedTaskConsumer(queue));
}
}
import java.util.concurrent.*; import java.util.*; class PrioritizedTask implements Runnable, Comparable<PrioritizedTask> { private Random rand = new Random(47); private static int counter = 0; private final int id = counter++; private final int priority; protected static List<PrioritizedTask> sequence = new ArrayList<PrioritizedTask>(); public PrioritizedTask(int priority) { this.priority = priority; sequence.add(this); } public int compareTo(PrioritizedTask arg) { return priority < arg.priority ? 1 : (priority > arg.priority ? -1 : 0); } public void run() { try { TimeUnit.MILLISECONDS.sleep(rand.nextInt(250)); } catch(InterruptedException e) { // Acceptable way to exit } System.out.println(this); } public String toString() { return String.format("[%1$-3d]", priority) + " Task " + id; } public String summary() { return "(" + id + ":" + priority + ")"; } public static class EndSentinel extends PrioritizedTask { private ExecutorService exec; public EndSentinel(ExecutorService e) { super(-1); // Lowest priority in this program exec = e; } public void run() { int count = 0; for(PrioritizedTask pt : sequence) { System.out.print(pt.summary()); if(++count % 5 == 0) System.out.println(); } System.out.println(); System.out.println(this + " Calling shutdownNow()"); exec.shutdownNow(); } } } class PrioritizedTaskProducer implements Runnable { private Random rand = new Random(47); private Queue<Runnable> queue; private ExecutorService exec; public PrioritizedTaskProducer(Queue<Runnable> q, ExecutorService e) { queue = q; exec = e; // Used for EndSentinel } public void run() { // Unbounded queue; never blocks. // Fill it up fast with random priorities: for(int i = 0; i < 20; i++) { queue.add(new PrioritizedTask(rand.nextInt(10))); Thread.yield(); } // Trickle in highest-priority jobs: try { for(int i = 0; i < 10; i++) { TimeUnit.MILLISECONDS.sleep(250); queue.add(new PrioritizedTask(10)); } // Add jobs, lowest priority first: for(int i = 0; i < 10; i++) queue.add(new PrioritizedTask(i)); // A sentinel to stop all the tasks: queue.add(new PrioritizedTask.EndSentinel(exec)); } catch(InterruptedException e) { // Acceptable way to exit } System.out.println("Finished PrioritizedTaskProducer"); } } class PrioritizedTaskConsumer implements Runnable { private PriorityBlockingQueue<Runnable> q; public PrioritizedTaskConsumer(PriorityBlockingQueue<Runnable> q) { this.q = q; } public void run() { try { while(!Thread.interrupted()) // Use current thread to run the task: q.take().run(); } catch(InterruptedException e) { // Acceptable way to exit } System.out.println("Finished PrioritizedTaskConsumer"); } } public class PriorityBlockingQueueDemo { public static void main(String[] args) throws Exception { Random rand = new Random(47); ExecutorService exec = Executors.newCachedThreadPool(); PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<Runnable>(); exec.execute(new PrioritizedTaskProducer(queue, exec)); exec.execute(new PrioritizedTaskConsumer(queue)); } }
import java.util.concurrent.*;
import java.util.*;

class PrioritizedTask implements Runnable, Comparable<PrioritizedTask>  {
    private Random rand = new Random(47);
    private static int counter = 0;
    private final int id = counter++;
    private final int priority;
    protected static List<PrioritizedTask> sequence = new ArrayList<PrioritizedTask>();
    public PrioritizedTask(int priority) {
        this.priority = priority;
        sequence.add(this);
    }
    public int compareTo(PrioritizedTask arg) {
        return priority < arg.priority ? 1 :
                (priority > arg.priority ? -1 : 0);
    }
    public void run() {
        try {
            TimeUnit.MILLISECONDS.sleep(rand.nextInt(250));
        } catch(InterruptedException e) {
            // Acceptable way to exit
        }
        System.out.println(this);
    }
    public String toString() {
        return String.format("[%1$-3d]", priority) +
                " Task " + id;
    }
    public String summary() {
        return "(" + id + ":" + priority + ")";
    }
    public static class EndSentinel extends PrioritizedTask {
        private ExecutorService exec;
        public EndSentinel(ExecutorService e) {
            super(-1); // Lowest priority in this program
            exec = e;
        }
        public void run() {
            int count = 0;
            for(PrioritizedTask pt : sequence) {
                System.out.print(pt.summary());
                if(++count % 5 == 0)
                    System.out.println();
            }
            System.out.println();
            System.out.println(this + " Calling shutdownNow()");
            exec.shutdownNow();
        }
    }
}

class PrioritizedTaskProducer implements Runnable {
    private Random rand = new Random(47);
    private Queue<Runnable> queue;
    private ExecutorService exec;
    public PrioritizedTaskProducer(Queue<Runnable> q, ExecutorService e) {
        queue = q;
        exec = e; // Used for EndSentinel
    }
    public void run() {
        // Unbounded queue; never blocks.
        // Fill it up fast with random priorities:
        for(int i = 0; i < 20; i++) {
            queue.add(new PrioritizedTask(rand.nextInt(10)));
            Thread.yield();
        }
        // Trickle in highest-priority jobs:
        try {
            for(int i = 0; i < 10; i++) {
                TimeUnit.MILLISECONDS.sleep(250);
                queue.add(new PrioritizedTask(10));
            }
            // Add jobs, lowest priority first:
            for(int i = 0; i < 10; i++)
                queue.add(new PrioritizedTask(i));
            // A sentinel to stop all the tasks:
            queue.add(new PrioritizedTask.EndSentinel(exec));
        } catch(InterruptedException e) {
            // Acceptable way to exit
        }
        System.out.println("Finished PrioritizedTaskProducer");
    }
}

class PrioritizedTaskConsumer implements Runnable {
    private PriorityBlockingQueue<Runnable> q;
    public PrioritizedTaskConsumer(PriorityBlockingQueue<Runnable> q) {
        this.q = q;
    }
    public void run() {
        try {
            while(!Thread.interrupted())
                // Use current thread to run the task:
                q.take().run();
        } catch(InterruptedException e) {
            // Acceptable way to exit
        }
        System.out.println("Finished PrioritizedTaskConsumer");
    }
}

public class PriorityBlockingQueueDemo {
    public static void main(String[] args) throws Exception {
        Random rand = new Random(47);
        ExecutorService exec = Executors.newCachedThreadPool();
        PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<Runnable>();
        exec.execute(new PrioritizedTaskProducer(queue, exec));
        exec.execute(new PrioritizedTaskConsumer(queue));
    }
}

上面例子中的PrioritizedTask实现Comparable接口,提供比较任务优先级的方法。PrioritizedTaskProducerPriorityBlockingQueue使用同一个PriorityBlockingQueue,分别通过add()take()向队列中添加和获取任务。这里没有使用同步,PriorityBlockingQueue会处理同步的问题。当队列为空时,take()会产生阻塞。