Concurrency: New Library Components (1)

Java SE5中加入的java.util.concurrent包含了很多用于解决并发问题的新类。

1. CountDownLatch

  创建CountDownLatch时需要指定一个初始的计数,在任务中调用CountDownLatch对象的await()会阻塞任务,直到CountDownLatch的计数为零。其他任务可以(在自己的工作完成后)调用CountDownLatch对象的countDown()来缩减计数。由此可以对一个或多个任务进行同步,使它们能够等待其他任务完成某项工作后,再继续执行。CountDownLatch是一次性的,不能重置计数。

  CountDownLatch的一个典型用法是把一个问题分成n个可以单独解决的子任务,初始化CountDownLatch的计数为n,子任务完成后调用countDown(),需要在当前问题解决后再执行的任务通过await()等待所有子任务完成后再执行。

这里把当前需要解决的问题分成SIZE份,由SIZETaskPortion执行,当所有TaskPortion执行完后,等待中的WaitingTask再开始执行。所有TaskPortionWaitingTask都使用同一个CountDownLatch latch

Library thread safety

  注意在上面的例子中,所有TaskPortion共享了一个static Random rand,由于Random.nextInt()是线程安全的,所以这样的用法没有问题。需要注意类似场合下所使用的共享资源是否是线程安全的,如果不是,可以把static去掉,使每个任务都有自己的实例。

2. CyclicBarrier

  CyclicBarrier可以让任务在完成自己工作后,等待其他任务完成,当所有任务都完成后,再唤醒所有等待中的任务继续运行。CyclicBarrier类似于CountDownLatch,二者的一个区别在于,CountDownLatch是一次性的,而CyclicBarrier可以反复使用。

上面是一个赛马的模拟程序,Horse每次随机地前进0~2步,若干个Horse同时运行,先达到终点(指定步数)的获胜。CyclicBarrier用于协调各个Horse的运行,在Horse前进过后(strides += rand.nextInt(3))后,调用barrier.await()使任务进入等待,当所有Horse都前进过后,再唤醒任务,开始下一轮循环。CyclicBarrierHorseRace()中通过barrier = new CyclicBarrier(nHorses, new Runnable(){...})创建,第一个参数nHorses指定了初始计数,第二个参数指定了一个Runnable,该Runnable会在计数为零后运行,用于打印和判断是否出现赢家。

3. DelayQueue

  DelayQueue是一个容纳实现了Delayed接口的元素的阻塞队列,队列中的元素只能在其延时到期后取出,DelayQueue按照延时到期时间对其中的元素进行排序,队首的元素的延时最先到期。如果没有延时到期的元素,只能从队列中取出null(所以不能把null放到DelayQueue中)。

  Delayed接口只有一个getDelay()方法,指示延时的时间,调用者通过TimeUnit参数指示自己想要的单位。Delayed接口继承了Comparable,所以还需要实现compareTo()方法,用于按照延时对任务进行排序。

这里为20个任务随机配置了延时,由打印可见,延时先到期的任务会先被执行。EndSentinel具有最大延时,位于队列末尾,用于结束所有任务。

4. PriorityBlockingQueue

  PriorityBlockingQueue顾名思义,就是一个带阻塞功能的优先队列。

上面例子中的PrioritizedTask实现Comparable接口,提供比较任务优先级的方法。PrioritizedTaskProducerPriorityBlockingQueue使用同一个PriorityBlockingQueue,分别通过add()take()向队列中添加和获取任务。这里没有使用同步,PriorityBlockingQueue会处理同步的问题。当队列为空时,take()会产生阻塞。