Concurrency: Basic Threading (1)
本系列是Thinking in Java (Fourth Edition)中Concurrency一章的内容总结。主要关注实例、常见用法和陷阱,便于日后查用。
1. Defining Tasks
任务(Task)用于描述希望并发运行的活动。定义一个任务,只需实现Runnable
接口,并在run()
方法中给出任务的具体行为:
public class LiftOff implements Runnable { protected int countDown = 10; // Default private static int taskCount = 0; private final int id = taskCount++; public LiftOff() {} public LiftOff(int countDown) { this.countDown = countDown; } public String status() { return "#" + id + "(" + (countDown > 0 ? countDown : "Liftoff!") + "), "; } public void run() { while(countDown-- > 0) { System.out.print(status()); Thread.yield(); } } }
为了使任务能够一直运行,run()
一般会包含循环。当不再需要任务继续运行时,再通过适当的判断条件终止循环,结束run()
方法。在run()
中调用静态方法Thread.yield()
,可以向线程调度器(Thread Scheduler)建议切换到其他任务。
2. The Thread Class
任务本身并不具备并发的能力。为了让任务能够并行的运行,需要把任务附着到用于执行任务的线程上。
把任务附着到线程上的传统方法是直接把实现了Runnable
接口的对象传递给Thread
的构造器:
public class BasicThreads { public static void main(String[] args) { Thread t = new Thread(new LiftOff()); t.start(); System.out.println("Waiting for LiftOff"); } } /* Output Waiting for LiftOff #0(9), #0(8), #0(7), #0(6), #0(5), #0(4), #0(3), #0(2), #0(1), #0(Liftoff!), */
Thread
对象的start()
方法会对线程进行必要的初始化,然后Runnable
的run()
会被调用,在新的线程开始任务。start()
会很快返回。
下面是一个创建多个任务的例子:
public class MoreBasicThreads { public static void main(String[] args) { for(int i = 0; i < 5; i++) new Thread(new LiftOff()).start(); System.out.println("Waiting for LiftOff"); } } /* Output Waiting for LiftOff #3(9), #4(9), #2(9), #1(9), #0(9), #1(8), #2(8), #4(8), #3(8), #4(7), #2(7), #2(6), #1(7), #0(8), #1(6), #1(5), #2(5), #4(6), #4(5), #4(4), #4(3), #4(2), #4(1), #3(7), #4(Liftoff!), #2(4), #1(4), #0(7), #1(3), #2(3), #2(2), #2(1), #2(Liftoff!), #3(6), #1(2), #0(6), #1(1), #1(Liftoff!), #3(5), #0(5), #3(4), #0(4), #3(3), #0(3), #0(2), #0(1), #0(Liftoff!), #3(2), #3(1), #3(Liftoff!), */
从打印可以看出,多个任务在它们各自的线程上同时运行,由于线程的调度的不确定性,上面程序的输出可能会有所不同。
线程间的切换由Thread Scheduler控制。在早期的JDK上,时间片的轮转较少,可能会看到一个线程上的倒数完毕后,才开始下一个线程。现代的JDK具有较好的时间片轮转方式,使得各个线程的切换更加频繁。不能期望线程调度的行为能一直保持一致。
上面的几个例子中,在main()
里调用Thread
的start()
时,main()
并没有持有线程的引用,但线程并不会被垃圾回收。每个线程都会对自己的引用进行注册,只有当run()
退出,任务完成后,线程才会被垃圾回收。
3. Using Executors
Java SE5新加入了java.util.concurrent包,其中的Executors
提供了对Thread
的管理,简化了并发编程,不需要直接操作Thread
就可以控制任务的执行。
import java.util.concurrent.*; public class CachedThreadPool { public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool(); for(int i = 0; i < 5; i++) exec.execute(new LiftOff()); exec.shutdown(); } } /* Output #1(9), #3(9), #0(9), #2(9), #0(8), #3(8), #1(8), #4(9), #1(7), #3(7), #0(7), #2(8), #0(6), #3(6), #1(6), #4(8), #1(5), #3(5), #0(5), #2(7), #0(4), #3(4), #1(4), #4(7), #1(3), #3(3), #0(3), #2(6), #0(2), #3(2), #1(2), #4(6), #1(1), #3(1), #0(1), #2(5), #0(Liftoff!), #3(Liftoff!), #1(Liftoff!), #4(5), #2(4), #4(4), #2(3), #4(3), #2(2), #4(2), #2(1), #4(1), #2(Liftoff!), #4(Liftoff!), */
这里通过Executors
的静态方法newCachedThreadPool()
获取ExecutorService
对象,再由其execute()
来执行LiftOff()
,不再需要直接操作Thread
对象。一般来说,一个Executor就可以用来创建和管理所有的任务。shutdown()
会阻止新任务再被添加到Executor,在调用shutdown()
之前添加的任务则会继续执行直到结束。
Executors
提供了获取不同类型Executor的方法,如可以把前面的CachedThreadPool
替换为FixedThreadPool
:
import java.util.concurrent.*; public class FixedThreadPool { public static void main(String[] args) { // Constructor argument is number of threads: ExecutorService exec = Executors.newFixedThreadPool(5); for(int i = 0; i < 5; i++) exec.execute(new LiftOff()); exec.shutdown(); } }
FixedThreadPool
会一次性地初始化固定数量个线程,之后就使用这些线程来执行任务,不再为任务创建新的线程,节省了时间。同时由于限制了所能使用的线程的数量,不必担心出现过量的资源消耗。
CachedThreadPool
则会根据实际需要创建新线程,并回收利用旧的线程。各种类型的线程池都会自动重复利用已存在的线程。
SingleThreadExecutor
相当于只有一个线程的FixedThreadPool
,适用于希望在单独线程中持续运行的任务,如监听Socket连接;也适用于诸如更新本地log之类的短小的任务。如果向SingleThreadExecutor
提交了多个任务,则这些任务会被队列,只有当一个任务运行完成,才会开始下个任务,且所有任务都会使用同一个线程。这一特性可以避免共享资源的同步问题。
import java.util.concurrent.*; public class SingleThreadExecutor { public static void main(String[] args) { ExecutorService exec = Executors.newSingleThreadExecutor(); for(int i = 0; i < 5; i++) exec.execute(new LiftOff()); exec.shutdown(); } } /* Output #0(9), #0(8), #0(7), #0(6), #0(5), #0(4), #0(3), #0(2), #0(1), #0(Liftoff!), #1(9), #1(8), #1(7), #1(6), #1(5), #1(4), #1(3), #1(2), #1(1), #1(Liftoff!), #2(9), #2(8), #2(7), #2(6), #2(5), #2(4), #2(3), #2(2), #2(1), #2(Liftoff!), #3(9), #3(8), #3(7), #3(6), #3(5), #3(4), #3(3), #3(2), #3(1), #3(Liftoff!), #4(9), #4(8), #4(7), #4(6), #4(5), #4(4), #4(3), #4(2), #4(1), #4(Liftoff!), */
4. Producing Return Values From Tasks
Runnable
本身并没有返回值,如果希望任务完成后能够返回值,可以使用Java SE 5中引入的Callable
。Callable
是一个具有类型参数的泛型,其类型参数就是返回值的类型,该返回值由call()返回(不是Runnable
的run()
)。要获得返回值,必须使用ExecutorService
的submit()
方法。
import java.util.concurrent.*; import java.util.*; class TaskWithResult implements Callable<String> { private int id; public TaskWithResult(int id) { this.id = id; } public String call() { return "result of TaskWithResult " + id; } } public class CallableDemo { public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool(); ArrayList<Future<String>> results = new ArrayList<Future<String>>(); for(int i = 0; i < 10; i++) results.add(exec.submit(new TaskWithResult(i))); for(Future<String> fs : results) try { // get() blocks until completion: System.out.println(fs.get()); } catch(InterruptedException e) { System.out.println(e); return; } catch(ExecutionException e) { System.out.println(e); } finally { exec.shutdown(); } } } /*Output result of TaskWithResult 0 result of TaskWithResult 1 result of TaskWithResult 2 result of TaskWithResult 3 result of TaskWithResult 4 result of TaskWithResult 5 result of TaskWithResult 6 result of TaskWithResult 7 result of TaskWithResult 8 result of TaskWithResult 9 */
这里的submit()
会返回一个与Callable
具有相同参数化类型(String
)的Future
对象。此时可以:
- 先通过的
Future
的isDone()
来查询Callable
是否已经完成并有了返回值,如果返回值已经就绪,则使用get()
来获取返回值。 - 不通过
isDone()
查询而直接使用get()
,get()
会导致阻塞,直到Callable
有了返回值才会继续。 - 使用
get(long timeout, TimeUnit unit)
,为get()
加上超时限制。