Concurrency: Basic Threading (2)

5. Sleep

  Sleep()可以在指定时间内阻塞当前任务的执行。

import java.util.concurrent.*;

public class SleepingTask extends LiftOff {
    public void run() {
        try {
            while(countDown-- > 0) {
                System.out.print(status());
                // Old-style:
                // Thread.sleep(100);
                // Java SE5/6-style:
                TimeUnit.MILLISECONDS.sleep(100);
            }
        } catch(InterruptedException e) {
            System.err.println("Interrupted");
        }
    }
    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();
        for(int i = 0; i < 5; i++)
            exec.execute(new SleepingTask());
        exec.shutdown();
    }
}
/* Output
#0(9), #1(9), #4(9), #3(9), #2(9), #4(8), #0(8), #2(8), #1(8), #3(8), #1(7), #4(7),
#3(7), #2(7), #0(7), #4(6), #0(6), #2(6), #3(6), #1(6), #3(5), #4(5), #2(5), #0(5),
#1(5), #2(4), #1(4), #4(4), #3(4), #0(4), #4(3), #1(3), #0(3), #2(3), #3(3), #1(2),
#2(2), #4(2), #3(2), #0(2), #4(1), #0(1), #2(1), #1(1), #3(1), #0(Liftoff!),
#2(Liftoff!), #3(Liftoff!), #1(Liftoff!), #4(Liftoff!), 
*/

这里使用了Java SE 5中引入的TimeUnit,显式地指明了时间单位MILLISECONDS,具有更好的可读性。这里在run()中捕获sleep()抛出的InterruptedException,异常不会跨线程传递,需要在抛出异常的任务中就地处理。

  从打印可以看到,上面例子中各个任务的运行比之前没有sleep()的版本更加规律,每个线程都进行过一次倒数,才会开始下一轮,因为打印输出后的sleep()会阻塞当前任务,使得任务调度器切换到其他线程的任务去执行。但不同平台的线程调度机制会有不同,不能依靠这种方式来控制任务的运行顺序。

6. Priority

  线程的优先级代表了线程对于调度器的重要程度,调度器会倾向于优先运行具有到优先级的线程,相比之下,低优先级的线程会运行地较少,但不代表低优先级的线程完全不会被运行。绝大多数情况下,所有线程都应该以默认优先级运行,尽量不要修改线程的优先级。

  可以通过getPriority()setPriority()获取和修改线程的优先级。

import java.util.concurrent.*; 
 
public class SimplePriorities implements Runnable { 
  private int countDown = 5; 
  private volatile double d; // No optimization 
  private int priority; 
  public SimplePriorities(int priority) { 
    this.priority = priority; 
  } 
  public String toString() { 
    return Thread.currentThread() + ": " + countDown; 
  } 
  public void run() { 
    Thread.currentThread().setPriority(priority); 
    while(true) { 
      // An expensive, interruptable operation: 
      for(int i = 1; i < 100000; i++) { 
        d += (Math.PI + Math.E) / (double)i; 
        if(i % 1000 == 0) 
          Thread.yield(); 
      } 
      System.out.println(this); 
      if(--countDown == 0) return; 
    } 
  } 
  public static void main(String[] args) { 
    ExecutorService exec = Executors.newCachedThreadPool(); 
    for(int i = 0; i < 5; i++) 
      exec.execute( 
        new SimplePriorities(Thread.MIN_PRIORITY)); 
    exec.execute( 
        new SimplePriorities(Thread.MAX_PRIORITY)); 
    exec.shutdown(); 
  } 
}
/* Output
Thread[pool-1-thread-6,10,main]: 5
Thread[pool-1-thread-3,1,main]: 5
Thread[pool-1-thread-5,1,main]: 5
Thread[pool-1-thread-1,1,main]: 5
Thread[pool-1-thread-2,1,main]: 5
Thread[pool-1-thread-4,1,main]: 5
Thread[pool-1-thread-6,10,main]: 4
Thread[pool-1-thread-3,1,main]: 4
Thread[pool-1-thread-1,1,main]: 4
Thread[pool-1-thread-5,1,main]: 4
Thread[pool-1-thread-2,1,main]: 4
Thread[pool-1-thread-4,1,main]: 4
...
*/

这里通过Thread.currentThread()获取当前线程的引用,在run()的开头设置线程的优先级,而不要在构造器中设置优先级(因为构造器执行的时候,ExecutorService还没有执行任务)。从打印可见,最晚执行的线程具有最高的优先级,会获得更多的执行。

  为了更明显地体现任务优先级,run()中包含了一段耗时的运算,这段运算会被调度器打断来进行任务切换,具有高优先级线程上的任务会更容易得到执行。

  JDK具有10个优先级,但不同操作系统也有各自的定义,Windows有7个不固定的优先级,Solaris有231个优先级。为了在不同系统上获得相同的效果,可以使用MAX_PRIORITYNORM_PRIORITYMIN_PRIORITY

7. Yielding

  在当前任务完成了阶段性的工作后,可以使用yield()可以向调度器建议切换到其他线程。注意yield()只是建议,不能对线程的切换进行可靠地控制。

  前面一直使用的LiftOff在打印后调用了Thread.yield(),来让各个任务获得平均的运行机会。如果注释掉LiftOff里的Thread.yield(),再运行MoreBasicThreads,就会发现各个任务的执行顺序变得更加随机:

public class LiftOff implements Runnable { 
    protected int countDown = 10; // Default 
    private static int taskCount = 0; 
    private final int id = taskCount++; 
    public LiftOff() {} 
    public LiftOff(int countDown) { 
        this.countDown = countDown; 
    } 
    public String status() { 
        return "#" + id + "(" + (countDown > 0 ? countDown : "Liftoff!") + "), "; 
    } 
    public void run() { 
        while(countDown-- > 0) { 
            System.out.print(status()); 
            // Thread.yield(); 
        } 
    } 
}

public class MoreBasicThreads { 
    public static void main(String[] args) { 
        for(int i = 0; i < 5; i++) 
            new Thread(new LiftOff()).start(); 
        System.out.println("Waiting for LiftOff"); 
    } 
} 
/* Output
#0(9), #1(9), #3(9), #4(9), #2(9), Waiting for LiftOff
#2(8), #4(8), #3(8), #1(8), #1(7), #1(6), #1(5), #1(4), #1(3), #1(2), #1(1), #0(8),
#1(Liftoff!), #3(7), #4(7), #2(7), #4(6), #3(6), #0(7), #3(5), #4(5), #2(6), #4(4),
#3(4), #0(6), #3(3), #4(3), #2(5), #4(2), #3(2), #0(5), #3(1), #4(1), #2(4),
#4(Liftoff!), #3(Liftoff!), #0(4), #2(3), #0(3), #2(2), #0(2), #2(1), #0(1),
#0(Liftoff!), #2(Liftoff!), 
*/

8. Daemon threads

  Daemon线程(守护线程)用于在后台为程序提供长期的服务,只要程序还在运行,Daemon线程就会一直存在。当所有的非Daemon线程完成时,程序结束,进程中的所有Daemon线程就会被杀掉。

8.1. 设置Daemon线程

8.1.1. 使用Thread

  要设置一个线程为Daemon线程,必须在线程开始(start())前调用setDaemon(true)

import java.util.concurrent.*;

public class SimpleDaemons implements Runnable {
    public void run() {
        try {
            while(true) {
                TimeUnit.MILLISECONDS.sleep(100);
                System.out.println(Thread.currentThread() + " " + this);
            }
        } catch(InterruptedException e) {
            System.out.println("sleep() interrupted");
        }
    }
    public static void main(String[] args) throws Exception {
        for(int i = 0; i < 10; i++) {
            Thread daemon = new Thread(new SimpleDaemons());
            daemon.setDaemon(true); // Must call before start()
            daemon.start();
        }
        System.out.println("All daemons started");
        TimeUnit.MILLISECONDS.sleep(175);
    }
}
/* Output
All daemons started
Thread[Thread-9,5,main] com.company.SimpleDaemons@116671b4
Thread[Thread-7,5,main] com.company.SimpleDaemons@79d62897
Thread[Thread-1,5,main] com.company.SimpleDaemons@3e31799e
Thread[Thread-0,5,main] com.company.SimpleDaemons@d8488cb
Thread[Thread-5,5,main] com.company.SimpleDaemons@47353e9f
Thread[Thread-3,5,main] com.company.SimpleDaemons@14a1c9ea
Thread[Thread-4,5,main] com.company.SimpleDaemons@517f906c
Thread[Thread-6,5,main] com.company.SimpleDaemons@517136d2
Thread[Thread-2,5,main] com.company.SimpleDaemons@37c3a03e
Thread[Thread-8,5,main] com.company.SimpleDaemons@5f96cf4a
*/

上面的代码在main()中进行了一段sleep(),期间可以看到Daemon线程的打印。之后main()结束,程序结束,所有Daemon线程都被杀掉。

8.1.2. 使用ThreadFactory

  上面的例子显式地创建了Thread对象,并通过Thread对象的引用来设置它为Daemon线程。使用Executors的时候,无法直接接触Thread对象,这时可以通过ThreadFactory来为Executors创建并配置线程。

import java.util.concurrent.*;

public class DaemonThreadFactory implements ThreadFactory {
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r);
        t.setDaemon(true);
        return t;
    }
}

  然后就可以使用DaemonThreadFactory作为Executors.newCachedThreadPool()的参数,来为Executors创建特定配置的Thread

import java.util.concurrent.*;

public class DaemonFromFactory implements Runnable {
    public void run() {
        try {
            while(true) {
                TimeUnit.MILLISECONDS.sleep(100);
                System.out.println(Thread.currentThread() + " " + this);
            }
        } catch(InterruptedException e) {
            System.out.println("Interrupted");
        }
    }
    public static void main(String[] args) throws Exception {
        ExecutorService exec = Executors.newCachedThreadPool(
                new DaemonThreadFactory());
        for(int i = 0; i < 10; i++)
            exec.execute(new DaemonFromFactory());
        System.out.println("All daemons started");
        TimeUnit.MILLISECONDS.sleep(500); // Run for a while
    }
}

除了这里使用的newCachedThreadPool()Executors创建ExecutorService的所有静态方法都有接受ThreadFactory的重载。

8.1.3. 使用ThreadPoolExecutor

  还可以更进一步地继承ThreadPoolExecutor,实现专门创建特定线程的ThreadPoolExecutor

import java.util.concurrent.*;

public class DaemonThreadPoolExecutor extends ThreadPoolExecutor {
    public DaemonThreadPoolExecutor() {
        super(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(),
                new DaemonThreadFactory());
    }

    public static void main(String[] args) throws Exception {
        ThreadPoolExecutor exec = new DaemonThreadPoolExecutor();
        for(int i = 0; i < 5; i++)
            exec.execute(new LiftOff());
        TimeUnit.MILLISECONDS.sleep(500);
        exec.shutdown();
    }
}

8.2. 判断Daemon线程

  使用isDaemon()可以检查一个线程是否为Daemon线程。在Daemon线程中创建的线程都会自动成为Daemon线程。

import java.util.concurrent.*;

class Daemon implements Runnable {
    private Thread[] t = new Thread[10];
    public void run() {
        for(int i = 0; i < t.length; i++) {
            t[i] = new Thread(new DaemonSpawn());
            t[i].start();
            System.out.print("DaemonSpawn " + i + " started, ");
        }
        System.out.println();
        for(int i = 0; i < t.length; i++)
            System.out.print("t[" + i + "].isDaemon() = " + t[i].isDaemon() + ", ");
        while(true)
            Thread.yield();
    }
}

class DaemonSpawn implements Runnable {
    public void run() {
        while(true)
            Thread.yield();
    }
}

public class Daemons {
    public static void main(String[] args) throws Exception {
        Thread d = new Thread(new Daemon());
        d.setDaemon(true);
        d.start();
        System.out.println("d.isDaemon() = " + d.isDaemon() + ", ");
        // Allow the daemon threads to
        // finish their startup processes:
        TimeUnit.SECONDS.sleep(1);
    }
}
/* Output
d.isDaemon() = true, 
DaemonSpawn 0 started, DaemonSpawn 1 started, DaemonSpawn 2 started, 
DaemonSpawn 3 started, DaemonSpawn 4 started, DaemonSpawn 5 started, 
DaemonSpawn 6 started, DaemonSpawn 7 started, DaemonSpawn 8 started, 
DaemonSpawn 9 started, 
t[0].isDaemon() = true, t[1].isDaemon() = true, t[2].isDaemon() = true, 
t[3].isDaemon() = true, t[4].isDaemon() = true, t[5].isDaemon() = true, 
t[6].isDaemon() = true, t[7].isDaemon() = true, t[8].isDaemon() = true, 
t[9].isDaemon() = true, 
*/

8.3. Daemon线程的结束

  需要注意的是,Daemon线程的run()结束时,不会执行finally下的内容。

// Daemon threads don’t run the finally clause 
import java.util.concurrent.*;

class ADaemon implements Runnable {
    public void run() {
        try {
            System.out.println("Starting ADaemon");
            TimeUnit.SECONDS.sleep(1);
        } catch(InterruptedException e) {
            System.out.println("Exiting via InterruptedException");
        } finally {
            System.out.println("This should always run?");
        }
    }
}

public class DaemonsDontRunFinally {
    public static void main(String[] args) throws Exception {
        Thread t = new Thread(new ADaemon());
        t.setDaemon(true);
        t.start();
    }
}
/* Output
Starting ADaemon
*/

由上面的例子可见,finally下的内容没有运行。注释掉t.setDaemon(true),就可以看到finally下打印了。

  当main()结束时,JVM会立即结束所有的Daemon线程,由于Daemon线程无法以常规的方式结束,非Daemon线程通常会是更好的选择。