Concurrency: Basic Threading (1)

本系列是Thinking in Java (Fourth Edition)中Concurrency一章的内容总结。主要关注实例、常见用法和陷阱,便于日后查用。

1. Defining Tasks

  任务(Task)用于描述希望并发运行的活动。定义一个任务,只需实现Runnable接口,并在run()方法中给出任务的具体行为:

public class LiftOff implements Runnable { 
    protected int countDown = 10; // Default 
    private static int taskCount = 0; 
    private final int id = taskCount++; 
    public LiftOff() {} 
    public LiftOff(int countDown) { 
        this.countDown = countDown; 
    } 
    public String status() { 
        return "#" + id + "(" + (countDown > 0 ? countDown : "Liftoff!") + "), "; 
    } 
    public void run() { 
        while(countDown-- > 0) { 
            System.out.print(status()); 
            Thread.yield(); 
        } 
    } 
}

  为了使任务能够一直运行,run()一般会包含循环。当不再需要任务继续运行时,再通过适当的判断条件终止循环,结束run()方法。在run()中调用静态方法Thread.yield(),可以向线程调度器(Thread Scheduler)建议切换到其他任务。

2. The Thread Class

  任务本身并不具备并发的能力。为了让任务能够并行的运行,需要把任务附着到用于执行任务的线程上。

  把任务附着到线程上的传统方法是直接把实现了Runnable接口的对象传递给Thread的构造器:

public class BasicThreads { 
    public static void main(String[] args) { 
        Thread t = new Thread(new LiftOff()); 
        t.start(); 
        System.out.println("Waiting for LiftOff"); 
    } 
}

/* Output
Waiting for LiftOff
#0(9), #0(8), #0(7), #0(6), #0(5), #0(4), #0(3), #0(2), #0(1), #0(Liftoff!), 
*/

  Thread对象的start()方法会对线程进行必要的初始化,然后Runnablerun()会被调用,在新的线程开始任务。start()会很快返回。

  下面是一个创建多个任务的例子:

public class MoreBasicThreads { 
    public static void main(String[] args) { 
        for(int i = 0; i < 5; i++) 
            new Thread(new LiftOff()).start(); 
        System.out.println("Waiting for LiftOff"); 
    } 
} 

/* Output
Waiting for LiftOff
#3(9), #4(9), #2(9), #1(9), #0(9), #1(8), #2(8), #4(8), #3(8), #4(7), #2(7), #2(6),
#1(7), #0(8), #1(6), #1(5), #2(5), #4(6), #4(5), #4(4), #4(3), #4(2), #4(1), #3(7),
#4(Liftoff!), #2(4), #1(4), #0(7), #1(3), #2(3), #2(2), #2(1), #2(Liftoff!), #3(6),
#1(2), #0(6), #1(1), #1(Liftoff!), #3(5), #0(5), #3(4), #0(4), #3(3), #0(3), #0(2),
#0(1), #0(Liftoff!), #3(2), #3(1), #3(Liftoff!), 
*/

从打印可以看出,多个任务在它们各自的线程上同时运行,由于线程的调度的不确定性,上面程序的输出可能会有所不同。

  线程间的切换由Thread Scheduler控制。在早期的JDK上,时间片的轮转较少,可能会看到一个线程上的倒数完毕后,才开始下一个线程。现代的JDK具有较好的时间片轮转方式,使得各个线程的切换更加频繁。不能期望线程调度的行为能一直保持一致。

  上面的几个例子中,在main()里调用Threadstart()时,main()并没有持有线程的引用,但线程并不会被垃圾回收。每个线程都会对自己的引用进行注册,只有当run()退出,任务完成后,线程才会被垃圾回收。

3. Using Executors

  Java SE5新加入了java.util.concurrent包,其中的Executors提供了对Thread的管理,简化了并发编程,不需要直接操作Thread就可以控制任务的执行。

import java.util.concurrent.*; 
 
public class CachedThreadPool { 
    public static void main(String[] args) { 
        ExecutorService exec = Executors.newCachedThreadPool(); 
        for(int i = 0; i < 5; i++) 
            exec.execute(new LiftOff()); 
        exec.shutdown(); 
    } 
} 

/* Output
#1(9), #3(9), #0(9), #2(9), #0(8), #3(8), #1(8), #4(9), #1(7), #3(7), #0(7), #2(8),
#0(6), #3(6), #1(6), #4(8), #1(5), #3(5), #0(5), #2(7), #0(4), #3(4), #1(4), #4(7),
#1(3), #3(3), #0(3), #2(6), #0(2), #3(2), #1(2), #4(6), #1(1), #3(1), #0(1), #2(5),
#0(Liftoff!), #3(Liftoff!), #1(Liftoff!), #4(5), #2(4), #4(4), #2(3), #4(3), #2(2),
#4(2), #2(1), #4(1), #2(Liftoff!), #4(Liftoff!), 
*/

这里通过Executors的静态方法newCachedThreadPool()获取ExecutorService对象,再由其execute()来执行LiftOff(),不再需要直接操作Thread对象。一般来说,一个Executor就可以用来创建和管理所有的任务。shutdown()会阻止新任务再被添加到Executor,在调用shutdown()之前添加的任务则会继续执行直到结束。

  Executors提供了获取不同类型Executor的方法,如可以把前面的CachedThreadPool替换为FixedThreadPool

import java.util.concurrent.*; 
 
public class FixedThreadPool { 
    public static void main(String[] args) { 
        // Constructor argument is number of threads: 
        ExecutorService exec = Executors.newFixedThreadPool(5); 
        for(int i = 0; i < 5; i++) 
            exec.execute(new LiftOff()); 
        exec.shutdown(); 
    } 
}

  FixedThreadPool会一次性地初始化固定数量个线程,之后就使用这些线程来执行任务,不再为任务创建新的线程,节省了时间。同时由于限制了所能使用的线程的数量,不必担心出现过量的资源消耗。

  CachedThreadPool则会根据实际需要创建新线程,并回收利用旧的线程。各种类型的线程池都会自动重复利用已存在的线程。

  SingleThreadExecutor相当于只有一个线程的FixedThreadPool,适用于希望在单独线程中持续运行的任务,如监听Socket连接;也适用于诸如更新本地log之类的短小的任务。如果向SingleThreadExecutor提交了多个任务,则这些任务会被队列,只有当一个任务运行完成,才会开始下个任务,且所有任务都会使用同一个线程。这一特性可以避免共享资源的同步问题。

import java.util.concurrent.*;

public class SingleThreadExecutor {
    public static void main(String[] args) {
        ExecutorService exec = Executors.newSingleThreadExecutor();
        for(int i = 0; i < 5; i++)
            exec.execute(new LiftOff());
        exec.shutdown();
    }
}
/* Output
#0(9), #0(8), #0(7), #0(6), #0(5), #0(4), #0(3), #0(2), #0(1), #0(Liftoff!), #1(9),
#1(8), #1(7), #1(6), #1(5), #1(4), #1(3), #1(2), #1(1), #1(Liftoff!), #2(9), #2(8),
#2(7), #2(6), #2(5), #2(4), #2(3), #2(2), #2(1), #2(Liftoff!), #3(9), #3(8), #3(7),
#3(6), #3(5), #3(4), #3(3), #3(2), #3(1), #3(Liftoff!), #4(9), #4(8), #4(7), #4(6),
#4(5), #4(4), #4(3), #4(2), #4(1), #4(Liftoff!), 
*/

4. Producing Return Values From Tasks

  Runnable本身并没有返回值,如果希望任务完成后能够返回值,可以使用Java SE 5中引入的CallableCallable是一个具有类型参数的泛型,其类型参数就是返回值的类型,该返回值由call()返回(不是Runnablerun())。要获得返回值,必须使用ExecutorServicesubmit()方法。

import java.util.concurrent.*;
import java.util.*;

class TaskWithResult implements Callable<String> {
    private int id;
    public TaskWithResult(int id) {
        this.id = id;
    }
    public String call() {
        return "result of TaskWithResult " + id;
    }
}

public class CallableDemo {
    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();
        ArrayList<Future<String>> results = new ArrayList<Future<String>>();
        
        for(int i = 0; i < 10; i++)
            results.add(exec.submit(new TaskWithResult(i)));
        
        for(Future<String> fs : results)
            try {
                // get() blocks until completion:
                System.out.println(fs.get());
            } catch(InterruptedException e) {
                System.out.println(e);
                return;
            } catch(ExecutionException e) {
                System.out.println(e);
            } finally {
                exec.shutdown();
            }
    }
}

/*Output
result of TaskWithResult 0
result of TaskWithResult 1
result of TaskWithResult 2
result of TaskWithResult 3
result of TaskWithResult 4
result of TaskWithResult 5
result of TaskWithResult 6
result of TaskWithResult 7
result of TaskWithResult 8
result of TaskWithResult 9
*/

这里的submit()会返回一个与Callable具有相同参数化类型(String)的Future对象。此时可以:

  • 先通过的FutureisDone()来查询Callable是否已经完成并有了返回值,如果返回值已经就绪,则使用get()来获取返回值。
  • 不通过isDone()查询而直接使用get()get()会导致阻塞,直到Callable有了返回值才会继续。
  • 使用get(long timeout, TimeUnit unit),为get()加上超时限制。