Concurrency: Basic Threading (1)
Author: nex3z
2016-06-28
本系列是Thinking in Java (Fourth Edition)中Concurrency一章的内容总结。主要关注实例、常见用法和陷阱,便于日后查用。
1. Defining Tasks
任务(Task)用于描述希望并发运行的活动。定义一个任务,只需实现Runnable
接口,并在run()
方法中给出任务的具体行为:
public class LiftOff implements Runnable {
protected int countDown = 10; // Default
private static int taskCount = 0;
private final int id = taskCount++;
public LiftOff(int countDown) {
this.countDown = countDown;
return "#" + id + "(" + (countDown > 0 ? countDown : "Liftoff!") + "), ";
System.out.print(status());
public class LiftOff implements Runnable {
protected int countDown = 10; // Default
private static int taskCount = 0;
private final int id = taskCount++;
public LiftOff() {}
public LiftOff(int countDown) {
this.countDown = countDown;
}
public String status() {
return "#" + id + "(" + (countDown > 0 ? countDown : "Liftoff!") + "), ";
}
public void run() {
while(countDown-- > 0) {
System.out.print(status());
Thread.yield();
}
}
}
public class LiftOff implements Runnable {
protected int countDown = 10; // Default
private static int taskCount = 0;
private final int id = taskCount++;
public LiftOff() {}
public LiftOff(int countDown) {
this.countDown = countDown;
}
public String status() {
return "#" + id + "(" + (countDown > 0 ? countDown : "Liftoff!") + "), ";
}
public void run() {
while(countDown-- > 0) {
System.out.print(status());
Thread.yield();
}
}
}
为了使任务能够一直运行,run()
一般会包含循环。当不再需要任务继续运行时,再通过适当的判断条件终止循环,结束run()
方法。在run()
中调用静态方法Thread.yield()
,可以向线程调度器(Thread Scheduler)建议切换到其他任务。
2. The Thread Class
任务本身并不具备并发的能力。为了让任务能够并行的运行,需要把任务附着到用于执行任务的线程上。
把任务附着到线程上的传统方法是直接把实现了Runnable
接口的对象传递给Thread
的构造器:
public class BasicThreads {
public static void main(String[] args) {
Thread t = new Thread(new LiftOff());
System.out.println("Waiting for LiftOff");
#0(9), #0(8), #0(7), #0(6), #0(5), #0(4), #0(3), #0(2), #0(1), #0(Liftoff!),
public class BasicThreads {
public static void main(String[] args) {
Thread t = new Thread(new LiftOff());
t.start();
System.out.println("Waiting for LiftOff");
}
}
/* Output
Waiting for LiftOff
#0(9), #0(8), #0(7), #0(6), #0(5), #0(4), #0(3), #0(2), #0(1), #0(Liftoff!),
*/
public class BasicThreads {
public static void main(String[] args) {
Thread t = new Thread(new LiftOff());
t.start();
System.out.println("Waiting for LiftOff");
}
}
/* Output
Waiting for LiftOff
#0(9), #0(8), #0(7), #0(6), #0(5), #0(4), #0(3), #0(2), #0(1), #0(Liftoff!),
*/
Thread
对象的start()
方法会对线程进行必要的初始化,然后Runnable
的run()
会被调用,在新的线程开始任务。start()
会很快返回。
下面是一个创建多个任务的例子:
public class MoreBasicThreads {
public static void main(String[] args) {
for(int i = 0; i < 5; i++)
new Thread(new LiftOff()).start();
System.out.println("Waiting for LiftOff");
#3(9), #4(9), #2(9), #1(9), #0(9), #1(8), #2(8), #4(8), #3(8), #4(7), #2(7), #2(6),
#1(7), #0(8), #1(6), #1(5), #2(5), #4(6), #4(5), #4(4), #4(3), #4(2), #4(1), #3(7),
#4(Liftoff!), #2(4), #1(4), #0(7), #1(3), #2(3), #2(2), #2(1), #2(Liftoff!), #3(6),
#1(2), #0(6), #1(1), #1(Liftoff!), #3(5), #0(5), #3(4), #0(4), #3(3), #0(3), #0(2),
#0(1), #0(Liftoff!), #3(2), #3(1), #3(Liftoff!),
public class MoreBasicThreads {
public static void main(String[] args) {
for(int i = 0; i < 5; i++)
new Thread(new LiftOff()).start();
System.out.println("Waiting for LiftOff");
}
}
/* Output
Waiting for LiftOff
#3(9), #4(9), #2(9), #1(9), #0(9), #1(8), #2(8), #4(8), #3(8), #4(7), #2(7), #2(6),
#1(7), #0(8), #1(6), #1(5), #2(5), #4(6), #4(5), #4(4), #4(3), #4(2), #4(1), #3(7),
#4(Liftoff!), #2(4), #1(4), #0(7), #1(3), #2(3), #2(2), #2(1), #2(Liftoff!), #3(6),
#1(2), #0(6), #1(1), #1(Liftoff!), #3(5), #0(5), #3(4), #0(4), #3(3), #0(3), #0(2),
#0(1), #0(Liftoff!), #3(2), #3(1), #3(Liftoff!),
*/
public class MoreBasicThreads {
public static void main(String[] args) {
for(int i = 0; i < 5; i++)
new Thread(new LiftOff()).start();
System.out.println("Waiting for LiftOff");
}
}
/* Output
Waiting for LiftOff
#3(9), #4(9), #2(9), #1(9), #0(9), #1(8), #2(8), #4(8), #3(8), #4(7), #2(7), #2(6),
#1(7), #0(8), #1(6), #1(5), #2(5), #4(6), #4(5), #4(4), #4(3), #4(2), #4(1), #3(7),
#4(Liftoff!), #2(4), #1(4), #0(7), #1(3), #2(3), #2(2), #2(1), #2(Liftoff!), #3(6),
#1(2), #0(6), #1(1), #1(Liftoff!), #3(5), #0(5), #3(4), #0(4), #3(3), #0(3), #0(2),
#0(1), #0(Liftoff!), #3(2), #3(1), #3(Liftoff!),
*/
从打印可以看出,多个任务在它们各自的线程上同时运行,由于线程的调度的不确定性,上面程序的输出可能会有所不同。
线程间的切换由Thread Scheduler控制。在早期的JDK上,时间片的轮转较少,可能会看到一个线程上的倒数完毕后,才开始下一个线程。现代的JDK具有较好的时间片轮转方式,使得各个线程的切换更加频繁。不能期望线程调度的行为能一直保持一致。
上面的几个例子中,在main()
里调用Thread
的start()
时,main()
并没有持有线程的引用,但线程并不会被垃圾回收。每个线程都会对自己的引用进行注册,只有当run()
退出,任务完成后,线程才会被垃圾回收。
3. Using Executors
Java SE5新加入了java.util.concurrent包,其中的Executors
提供了对Thread
的管理,简化了并发编程,不需要直接操作Thread
就可以控制任务的执行。
import java.util.concurrent.*;
public class CachedThreadPool {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
for(int i = 0; i < 5; i++)
exec.execute(new LiftOff());
#1(9), #3(9), #0(9), #2(9), #0(8), #3(8), #1(8), #4(9), #1(7), #3(7), #0(7), #2(8),
#0(6), #3(6), #1(6), #4(8), #1(5), #3(5), #0(5), #2(7), #0(4), #3(4), #1(4), #4(7),
#1(3), #3(3), #0(3), #2(6), #0(2), #3(2), #1(2), #4(6), #1(1), #3(1), #0(1), #2(5),
#0(Liftoff!), #3(Liftoff!), #1(Liftoff!), #4(5), #2(4), #4(4), #2(3), #4(3), #2(2),
#4(2), #2(1), #4(1), #2(Liftoff!), #4(Liftoff!),
import java.util.concurrent.*;
public class CachedThreadPool {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
for(int i = 0; i < 5; i++)
exec.execute(new LiftOff());
exec.shutdown();
}
}
/* Output
#1(9), #3(9), #0(9), #2(9), #0(8), #3(8), #1(8), #4(9), #1(7), #3(7), #0(7), #2(8),
#0(6), #3(6), #1(6), #4(8), #1(5), #3(5), #0(5), #2(7), #0(4), #3(4), #1(4), #4(7),
#1(3), #3(3), #0(3), #2(6), #0(2), #3(2), #1(2), #4(6), #1(1), #3(1), #0(1), #2(5),
#0(Liftoff!), #3(Liftoff!), #1(Liftoff!), #4(5), #2(4), #4(4), #2(3), #4(3), #2(2),
#4(2), #2(1), #4(1), #2(Liftoff!), #4(Liftoff!),
*/
import java.util.concurrent.*;
public class CachedThreadPool {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
for(int i = 0; i < 5; i++)
exec.execute(new LiftOff());
exec.shutdown();
}
}
/* Output
#1(9), #3(9), #0(9), #2(9), #0(8), #3(8), #1(8), #4(9), #1(7), #3(7), #0(7), #2(8),
#0(6), #3(6), #1(6), #4(8), #1(5), #3(5), #0(5), #2(7), #0(4), #3(4), #1(4), #4(7),
#1(3), #3(3), #0(3), #2(6), #0(2), #3(2), #1(2), #4(6), #1(1), #3(1), #0(1), #2(5),
#0(Liftoff!), #3(Liftoff!), #1(Liftoff!), #4(5), #2(4), #4(4), #2(3), #4(3), #2(2),
#4(2), #2(1), #4(1), #2(Liftoff!), #4(Liftoff!),
*/
这里通过Executors
的静态方法newCachedThreadPool()
获取ExecutorService
对象,再由其execute()
来执行LiftOff()
,不再需要直接操作Thread
对象。一般来说,一个Executor就可以用来创建和管理所有的任务。shutdown()
会阻止新任务再被添加到Executor,在调用shutdown()
之前添加的任务则会继续执行直到结束。
Executors
提供了获取不同类型Executor的方法,如可以把前面的CachedThreadPool
替换为FixedThreadPool
:
import java.util.concurrent.*;
public class FixedThreadPool {
public static void main(String[] args) {
// Constructor argument is number of threads:
ExecutorService exec = Executors.newFixedThreadPool(5);
for(int i = 0; i < 5; i++)
exec.execute(new LiftOff());
import java.util.concurrent.*;
public class FixedThreadPool {
public static void main(String[] args) {
// Constructor argument is number of threads:
ExecutorService exec = Executors.newFixedThreadPool(5);
for(int i = 0; i < 5; i++)
exec.execute(new LiftOff());
exec.shutdown();
}
}
import java.util.concurrent.*;
public class FixedThreadPool {
public static void main(String[] args) {
// Constructor argument is number of threads:
ExecutorService exec = Executors.newFixedThreadPool(5);
for(int i = 0; i < 5; i++)
exec.execute(new LiftOff());
exec.shutdown();
}
}
FixedThreadPool
会一次性地初始化固定数量个线程,之后就使用这些线程来执行任务,不再为任务创建新的线程,节省了时间。同时由于限制了所能使用的线程的数量,不必担心出现过量的资源消耗。
CachedThreadPool
则会根据实际需要创建新线程,并回收利用旧的线程。各种类型的线程池都会自动重复利用已存在的线程。
SingleThreadExecutor
相当于只有一个线程的FixedThreadPool
,适用于希望在单独线程中持续运行的任务,如监听Socket连接;也适用于诸如更新本地log之类的短小的任务。如果向SingleThreadExecutor
提交了多个任务,则这些任务会被队列,只有当一个任务运行完成,才会开始下个任务,且所有任务都会使用同一个线程。这一特性可以避免共享资源的同步问题。
import java.util.concurrent.*;
public class SingleThreadExecutor {
public static void main(String[] args) {
ExecutorService exec = Executors.newSingleThreadExecutor();
for(int i = 0; i < 5; i++)
exec.execute(new LiftOff());
#0(9), #0(8), #0(7), #0(6), #0(5), #0(4), #0(3), #0(2), #0(1), #0(Liftoff!), #1(9),
#1(8), #1(7), #1(6), #1(5), #1(4), #1(3), #1(2), #1(1), #1(Liftoff!), #2(9), #2(8),
#2(7), #2(6), #2(5), #2(4), #2(3), #2(2), #2(1), #2(Liftoff!), #3(9), #3(8), #3(7),
#3(6), #3(5), #3(4), #3(3), #3(2), #3(1), #3(Liftoff!), #4(9), #4(8), #4(7), #4(6),
#4(5), #4(4), #4(3), #4(2), #4(1), #4(Liftoff!),
import java.util.concurrent.*;
public class SingleThreadExecutor {
public static void main(String[] args) {
ExecutorService exec = Executors.newSingleThreadExecutor();
for(int i = 0; i < 5; i++)
exec.execute(new LiftOff());
exec.shutdown();
}
}
/* Output
#0(9), #0(8), #0(7), #0(6), #0(5), #0(4), #0(3), #0(2), #0(1), #0(Liftoff!), #1(9),
#1(8), #1(7), #1(6), #1(5), #1(4), #1(3), #1(2), #1(1), #1(Liftoff!), #2(9), #2(8),
#2(7), #2(6), #2(5), #2(4), #2(3), #2(2), #2(1), #2(Liftoff!), #3(9), #3(8), #3(7),
#3(6), #3(5), #3(4), #3(3), #3(2), #3(1), #3(Liftoff!), #4(9), #4(8), #4(7), #4(6),
#4(5), #4(4), #4(3), #4(2), #4(1), #4(Liftoff!),
*/
import java.util.concurrent.*;
public class SingleThreadExecutor {
public static void main(String[] args) {
ExecutorService exec = Executors.newSingleThreadExecutor();
for(int i = 0; i < 5; i++)
exec.execute(new LiftOff());
exec.shutdown();
}
}
/* Output
#0(9), #0(8), #0(7), #0(6), #0(5), #0(4), #0(3), #0(2), #0(1), #0(Liftoff!), #1(9),
#1(8), #1(7), #1(6), #1(5), #1(4), #1(3), #1(2), #1(1), #1(Liftoff!), #2(9), #2(8),
#2(7), #2(6), #2(5), #2(4), #2(3), #2(2), #2(1), #2(Liftoff!), #3(9), #3(8), #3(7),
#3(6), #3(5), #3(4), #3(3), #3(2), #3(1), #3(Liftoff!), #4(9), #4(8), #4(7), #4(6),
#4(5), #4(4), #4(3), #4(2), #4(1), #4(Liftoff!),
*/
4. Producing Return Values From Tasks
Runnable
本身并没有返回值,如果希望任务完成后能够返回值,可以使用Java SE 5中引入的Callable
。Callable
是一个具有类型参数的泛型,其类型参数就是返回值的类型,该返回值由call()返回(不是Runnable
的run()
)。要获得返回值,必须使用ExecutorService
的submit()
方法。
import java.util.concurrent.*;
class TaskWithResult implements Callable<String> {
public TaskWithResult(int id) {
return "result of TaskWithResult " + id;
public class CallableDemo {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
ArrayList<Future<String>> results = new ArrayList<Future<String>>();
for(int i = 0; i < 10; i++)
results.add(exec.submit(new TaskWithResult(i)));
for(Future<String> fs : results)
// get() blocks until completion:
System.out.println(fs.get());
} catch(InterruptedException e) {
} catch(ExecutionException e) {
result of TaskWithResult 0
result of TaskWithResult 1
result of TaskWithResult 2
result of TaskWithResult 3
result of TaskWithResult 4
result of TaskWithResult 5
result of TaskWithResult 6
result of TaskWithResult 7
result of TaskWithResult 8
result of TaskWithResult 9
import java.util.concurrent.*;
import java.util.*;
class TaskWithResult implements Callable<String> {
private int id;
public TaskWithResult(int id) {
this.id = id;
}
public String call() {
return "result of TaskWithResult " + id;
}
}
public class CallableDemo {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
ArrayList<Future<String>> results = new ArrayList<Future<String>>();
for(int i = 0; i < 10; i++)
results.add(exec.submit(new TaskWithResult(i)));
for(Future<String> fs : results)
try {
// get() blocks until completion:
System.out.println(fs.get());
} catch(InterruptedException e) {
System.out.println(e);
return;
} catch(ExecutionException e) {
System.out.println(e);
} finally {
exec.shutdown();
}
}
}
/*Output
result of TaskWithResult 0
result of TaskWithResult 1
result of TaskWithResult 2
result of TaskWithResult 3
result of TaskWithResult 4
result of TaskWithResult 5
result of TaskWithResult 6
result of TaskWithResult 7
result of TaskWithResult 8
result of TaskWithResult 9
*/
import java.util.concurrent.*;
import java.util.*;
class TaskWithResult implements Callable<String> {
private int id;
public TaskWithResult(int id) {
this.id = id;
}
public String call() {
return "result of TaskWithResult " + id;
}
}
public class CallableDemo {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
ArrayList<Future<String>> results = new ArrayList<Future<String>>();
for(int i = 0; i < 10; i++)
results.add(exec.submit(new TaskWithResult(i)));
for(Future<String> fs : results)
try {
// get() blocks until completion:
System.out.println(fs.get());
} catch(InterruptedException e) {
System.out.println(e);
return;
} catch(ExecutionException e) {
System.out.println(e);
} finally {
exec.shutdown();
}
}
}
/*Output
result of TaskWithResult 0
result of TaskWithResult 1
result of TaskWithResult 2
result of TaskWithResult 3
result of TaskWithResult 4
result of TaskWithResult 5
result of TaskWithResult 6
result of TaskWithResult 7
result of TaskWithResult 8
result of TaskWithResult 9
*/
这里的submit()
会返回一个与Callable
具有相同参数化类型(String
)的Future
对象。此时可以:
- 先通过的
Future
的isDone()
来查询Callable
是否已经完成并有了返回值,如果返回值已经就绪,则使用get()
来获取返回值。
- 不通过
isDone()
查询而直接使用get()
,get()
会导致阻塞,直到Callable
有了返回值才会继续。
- 使用
get(long timeout, TimeUnit unit)
,为get()
加上超时限制。