Concurrency: New Library Components (1)
Java SE5中加入的java.util.concurrent
包含了很多用于解决并发问题的新类。
1. CountDownLatch
创建CountDownLatch
时需要指定一个初始的计数,在任务中调用CountDownLatch
对象的await()
会阻塞任务,直到CountDownLatch
的计数为零。其他任务可以(在自己的工作完成后)调用CountDownLatch
对象的countDown()
来缩减计数。由此可以对一个或多个任务进行同步,使它们能够等待其他任务完成某项工作后,再继续执行。CountDownLatch
是一次性的,不能重置计数。
CountDownLatch
的一个典型用法是把一个问题分成n个可以单独解决的子任务,初始化CountDownLatch
的计数为n
,子任务完成后调用countDown()
,需要在当前问题解决后再执行的任务通过await()
等待所有子任务完成后再执行。
import java.util.concurrent.*; import java.util.*; // Performs some portion of a task: class TaskPortion implements Runnable { private static int counter = 0; private final int id = counter++; private static Random rand = new Random(47); private final CountDownLatch latch; TaskPortion(CountDownLatch latch) { this.latch = latch; } public void run() { try { doWork(); latch.countDown(); } catch(InterruptedException ex) { // Acceptable way to exit } } public void doWork() throws InterruptedException { TimeUnit.MILLISECONDS.sleep(rand.nextInt(2000)); System.out.println(this + "completed"); } public String toString() { return String.format("%1$-3d ", id); } } // Waits on the CountDownLatch: class WaitingTask implements Runnable { private static int counter = 0; private final int id = counter++; private final CountDownLatch latch; WaitingTask(CountDownLatch latch) { this.latch = latch; } public void run() { try { latch.await(); System.out.println("Latch barrier passed for " + this); } catch(InterruptedException ex) { System.out.println(this + " interrupted"); } } public String toString() { return String.format("WaitingTask %1$-3d ", id); } } public class CountDownLatchDemo { static final int SIZE = 100; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); // All must share a single CountDownLatch object: CountDownLatch latch = new CountDownLatch(SIZE); for(int i = 0; i < 10; i++) exec.execute(new WaitingTask(latch)); for(int i = 0; i < SIZE; i++) exec.execute(new TaskPortion(latch)); System.out.println("Launched all tasks"); exec.shutdown(); // Quit when all tasks complete } }
这里把当前需要解决的问题分成SIZE
份,由SIZE
个TaskPortion
执行,当所有TaskPortion
执行完后,等待中的WaitingTask
再开始执行。所有TaskPortion
和WaitingTask
都使用同一个CountDownLatch latch
。
Library thread safety
注意在上面的例子中,所有TaskPortion
共享了一个static Random rand
,由于Random.nextInt()
是线程安全的,所以这样的用法没有问题。需要注意类似场合下所使用的共享资源是否是线程安全的,如果不是,可以把static
去掉,使每个任务都有自己的实例。
2. CyclicBarrier
CyclicBarrier
可以让任务在完成自己工作后,等待其他任务完成,当所有任务都完成后,再唤醒所有等待中的任务继续运行。CyclicBarrier
类似于CountDownLatch
,二者的一个区别在于,CountDownLatch
是一次性的,而CyclicBarrier
可以反复使用。
import java.util.concurrent.*; import java.util.*; class Horse implements Runnable { private static int counter = 0; private final int id = counter++; private int strides = 0; private static Random rand = new Random(47); private static CyclicBarrier barrier; public Horse(CyclicBarrier b) { barrier = b; } public synchronized int getStrides() { return strides; } public void run() { try { while(!Thread.interrupted()) { synchronized(this) { strides += rand.nextInt(3); // Produces 0, 1 or 2 } barrier.await(); } } catch(InterruptedException e) { // A legitimate way to exit } catch(BrokenBarrierException e) { // This one we want to know about throw new RuntimeException(e); } } public String toString() { return "Horse " + id + " "; } public String tracks() { StringBuilder s = new StringBuilder(); for(int i = 0; i < getStrides(); i++) s.append("*"); s.append(id); return s.toString(); } } public class HorseRace { static final int FINISH_LINE = 75; private List<Horse> horses = new ArrayList<Horse>(); private ExecutorService exec = Executors.newCachedThreadPool(); private CyclicBarrier barrier; public HorseRace(int nHorses, final int pause) { barrier = new CyclicBarrier(nHorses, new Runnable() { public void run() { StringBuilder s = new StringBuilder(); for(int i = 0; i < FINISH_LINE; i++) s.append("="); // The fence on the racetrack System.out.println(s); for(Horse horse : horses) System.out.println(horse.tracks()); for(Horse horse : horses) if(horse.getStrides() >= FINISH_LINE) { System.out.println(horse + "won!"); exec.shutdownNow(); return; } try { TimeUnit.MILLISECONDS.sleep(pause); } catch(InterruptedException e) { System.out.println("barrier-action sleep interrupted"); } } }); for(int i = 0; i < nHorses; i++) { Horse horse = new Horse(barrier); horses.add(horse); exec.execute(horse); } } public static void main(String[] args) { int nHorses = 7; int pause = 200; if(args.length > 0) { // Optional argument int n = new Integer(args[0]); nHorses = n > 0 ? n : nHorses; } if(args.length > 1) { // Optional argument int p = new Integer(args[1]); pause = p > -1 ? p : pause; } new HorseRace(nHorses, pause); } }
上面是一个赛马的模拟程序,Horse
每次随机地前进0~2步,若干个Horse
同时运行,先达到终点(指定步数)的获胜。CyclicBarrier
用于协调各个Horse
的运行,在Horse
前进过后(strides += rand.nextInt(3)
)后,调用barrier.await()
使任务进入等待,当所有Horse
都前进过后,再唤醒任务,开始下一轮循环。CyclicBarrier
在HorseRace()
中通过barrier = new CyclicBarrier(nHorses, new Runnable(){...})
创建,第一个参数nHorses
指定了初始计数,第二个参数指定了一个Runnable
,该Runnable
会在计数为零后运行,用于打印和判断是否出现赢家。
3. DelayQueue
DelayQueue
是一个容纳实现了Delayed
接口的元素的阻塞队列,队列中的元素只能在其延时到期后取出,DelayQueue
按照延时到期时间对其中的元素进行排序,队首的元素的延时最先到期。如果没有延时到期的元素,只能从队列中取出null
(所以不能把null
放到DelayQueue
中)。
Delayed
接口只有一个getDelay()
方法,指示延时的时间,调用者通过TimeUnit
参数指示自己想要的单位。Delayed
接口继承了Comparable
,所以还需要实现compareTo()
方法,用于按照延时对任务进行排序。
import java.util.concurrent.*; import java.util.*; import static java.util.concurrent.TimeUnit.*; class DelayedTask implements Runnable, Delayed { private static int counter = 0; private final int id = counter++; private final int delta; private final long trigger; protected static List<DelayedTask> sequence = new ArrayList<DelayedTask>(); public DelayedTask(int delayInMilliseconds) { delta = delayInMilliseconds; trigger = System.nanoTime() + NANOSECONDS.convert(delta, MILLISECONDS); sequence.add(this); } public long getDelay(TimeUnit unit) { return unit.convert(trigger - System.nanoTime(), NANOSECONDS); } public int compareTo(Delayed arg) { DelayedTask that = (DelayedTask)arg; if(trigger < that.trigger) return -1; if(trigger > that.trigger) return 1; return 0; } public void run() { System.out.print(this + " "); } public String toString() { return String.format("[%1$-4d]", delta) + " Task " + id; } public String summary() { return "(" + id + ":" + delta + ")"; } public static class EndSentinel extends DelayedTask { private ExecutorService exec; public EndSentinel(int delay, ExecutorService e) { super(delay); exec = e; } public void run() { for(DelayedTask pt : sequence) { System.out.print(pt.summary() + " "); } System.out.println(); System.out.println(this + " Calling shutdownNow()"); exec.shutdownNow(); } } } class DelayedTaskConsumer implements Runnable { private DelayQueue<DelayedTask> q; public DelayedTaskConsumer(DelayQueue<DelayedTask> q) { this.q = q; } public void run() { try { while(!Thread.interrupted()) q.take().run(); // Run task with the current thread } catch(InterruptedException e) { // Acceptable way to exit } System.out.println("Finished DelayedTaskConsumer"); } } public class DelayQueueDemo { public static void main(String[] args) { Random rand = new Random(47); ExecutorService exec = Executors.newCachedThreadPool(); DelayQueue<DelayedTask> queue = new DelayQueue<DelayedTask>(); // Fill with tasks that have random delays: for(int i = 0; i < 20; i++) queue.put(new DelayedTask(rand.nextInt(5000))); // Set the stopping point queue.add(new DelayedTask.EndSentinel(5000, exec)); exec.execute(new DelayedTaskConsumer(queue)); } } /* Output [128 ] Task 11 [200 ] Task 7 [429 ] Task 5 [520 ] Task 18 [555 ] Task 1 [961 ] Task 4 [998 ] Task 16 [1207] Task 9 [1693] Task 2 [1809] Task 14 [1861] Task 3 [2278] Task 15 [3288] Task 10 [3551] Task 12 [4258] Task 0 [4258] Task 19 [4522] Task 8 [4589] Task 13 [4861] Task 17 [4868] Task 6 (0:4258) (1:555) (2:1693) (3:1861) (4:961) (5:429) (6:4868) (7:200) (8:4522) (9:1207) (10:3288) (11:128) (12:3551) (13:4589) (14:1809) (15:2278) (16:998) (17:4861) (18:520) (19:4258) (20:5000) [5000] Task 20 Calling shutdownNow() Finished DelayedTaskConsumer */
这里为20个任务随机配置了延时,由打印可见,延时先到期的任务会先被执行。EndSentinel
具有最大延时,位于队列末尾,用于结束所有任务。
4. PriorityBlockingQueue
PriorityBlockingQueue
顾名思义,就是一个带阻塞功能的优先队列。
import java.util.concurrent.*; import java.util.*; class PrioritizedTask implements Runnable, Comparable<PrioritizedTask> { private Random rand = new Random(47); private static int counter = 0; private final int id = counter++; private final int priority; protected static List<PrioritizedTask> sequence = new ArrayList<PrioritizedTask>(); public PrioritizedTask(int priority) { this.priority = priority; sequence.add(this); } public int compareTo(PrioritizedTask arg) { return priority < arg.priority ? 1 : (priority > arg.priority ? -1 : 0); } public void run() { try { TimeUnit.MILLISECONDS.sleep(rand.nextInt(250)); } catch(InterruptedException e) { // Acceptable way to exit } System.out.println(this); } public String toString() { return String.format("[%1$-3d]", priority) + " Task " + id; } public String summary() { return "(" + id + ":" + priority + ")"; } public static class EndSentinel extends PrioritizedTask { private ExecutorService exec; public EndSentinel(ExecutorService e) { super(-1); // Lowest priority in this program exec = e; } public void run() { int count = 0; for(PrioritizedTask pt : sequence) { System.out.print(pt.summary()); if(++count % 5 == 0) System.out.println(); } System.out.println(); System.out.println(this + " Calling shutdownNow()"); exec.shutdownNow(); } } } class PrioritizedTaskProducer implements Runnable { private Random rand = new Random(47); private Queue<Runnable> queue; private ExecutorService exec; public PrioritizedTaskProducer(Queue<Runnable> q, ExecutorService e) { queue = q; exec = e; // Used for EndSentinel } public void run() { // Unbounded queue; never blocks. // Fill it up fast with random priorities: for(int i = 0; i < 20; i++) { queue.add(new PrioritizedTask(rand.nextInt(10))); Thread.yield(); } // Trickle in highest-priority jobs: try { for(int i = 0; i < 10; i++) { TimeUnit.MILLISECONDS.sleep(250); queue.add(new PrioritizedTask(10)); } // Add jobs, lowest priority first: for(int i = 0; i < 10; i++) queue.add(new PrioritizedTask(i)); // A sentinel to stop all the tasks: queue.add(new PrioritizedTask.EndSentinel(exec)); } catch(InterruptedException e) { // Acceptable way to exit } System.out.println("Finished PrioritizedTaskProducer"); } } class PrioritizedTaskConsumer implements Runnable { private PriorityBlockingQueue<Runnable> q; public PrioritizedTaskConsumer(PriorityBlockingQueue<Runnable> q) { this.q = q; } public void run() { try { while(!Thread.interrupted()) // Use current thread to run the task: q.take().run(); } catch(InterruptedException e) { // Acceptable way to exit } System.out.println("Finished PrioritizedTaskConsumer"); } } public class PriorityBlockingQueueDemo { public static void main(String[] args) throws Exception { Random rand = new Random(47); ExecutorService exec = Executors.newCachedThreadPool(); PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<Runnable>(); exec.execute(new PrioritizedTaskProducer(queue, exec)); exec.execute(new PrioritizedTaskConsumer(queue)); } }
上面例子中的PrioritizedTask
实现Comparable
接口,提供比较任务优先级的方法。PrioritizedTaskProducer
和PriorityBlockingQueue
使用同一个PriorityBlockingQueue
,分别通过add()
和take()
向队列中添加和获取任务。这里没有使用同步,PriorityBlockingQueue
会处理同步的问题。当队列为空时,take()
会产生阻塞。