Concurrency: Terminating tasks

Terminating tasks

1. The Ornamental Garden

  下面的例子使用标志canceled主动停止任务:

import java.util.concurrent.*;
import java.util.*;

class Count {
    private int count = 0;
    private Random rand = new Random(47);
    // Remove the synchronized keyword to see counting fail:
    public synchronized int increment() {
        int temp = count;
        if(rand.nextBoolean()) // Yield half the time
            Thread.yield();
        return (count = ++temp);
    }
    public synchronized int value() { return count; }
}

class Entrance implements Runnable {
    private static Count count = new Count();
    private static List<Entrance> entrances =
            new ArrayList<Entrance>();
    private int number = 0;
    // Doesn’t need synchronization to read:
    private final int id;
    private static volatile boolean canceled = false;
    // Atomic operation on a volatile field:
    public static void cancel() { canceled = true; }
    public Entrance(int id) {
        this.id = id;
        // Keep this task in a list. Also prevents
        // garbage collection of dead tasks:
        entrances.add(this);
    }
    public void run() {
        while(!canceled) {
            synchronized(this) {
                ++number;
            }
            System.out.println(this + " Total: " + count.increment());
            try {
                TimeUnit.MILLISECONDS.sleep(100);
            } catch(InterruptedException e) {
                System.out.println("sleep interrupted");
            }
        }
        System.out.println("Stopping " + this);
    }
    public synchronized int getValue() { return number; }
    public String toString() {
        return "Entrance " + id + ": " + getValue();
    }
    public static int getTotalCount() {
        return count.value();
    }
    public static int sumEntrances() {
        int sum = 0;
        for(Entrance entrance : entrances)
            sum += entrance.getValue();
        return sum;
    }
}

public class OrnamentalGarden {
    public static void main(String[] args) throws Exception {
        ExecutorService exec = Executors.newCachedThreadPool();
        for(int i = 0; i < 5; i++)
            exec.execute(new Entrance(i));
        // Run for a while, then stop and collect the data:
        TimeUnit.SECONDS.sleep(3);
        Entrance.cancel();
        exec.shutdown();
        if(!exec.awaitTermination(250, TimeUnit.MILLISECONDS))
            System.out.println("Some tasks were not terminated!");
        System.out.println("Total: " + Entrance.getTotalCount());
        System.out.println("Sum of Entrances: " + Entrance.sumEntrances());
    }
}

Entrancecancel()会设置canceledtrue,之后run()中的循环(while(!canceled))就会停止,任务结束。这里对canceled的操作只有赋值和读取,都是原子操作,不会被打断,所以只需把canceled设置为volatile保证可见性,不需要使用同步。

  在OrnamentalGardenmain()中结束任务时,首先使用Entrance.cancel()让各任务停止循环,结束各自的run()。然后调用了exec.shutdown()exec.awaitTermination()exec.awaitTermination()用于在一段时间内等待各个任务完成,如果任务能够在时限内(这里是250毫秒)完成,则返回true;否则返回false,表示还有任务没有完成。

  这里所有的Entrance的实例都被添加到了一个静态的列表List<Entrance> entrances中,所以在各个Entrance任务完成之后,各Entrance实例不会被回收,还可以通过entrances来进行访问。

2. Terminating When Blocked

  上面的例子中,任务在执行的过程中检查标志位,主动结束任务。如果当前线程被sleep()等操作阻塞,就需要使用其他方法来结束任务。

2.1. Thread states

  线程有以下四种状态:

  1. New: 线程在被创建时才会短暂地处于这一状态,进行系统资源分配和初始化,之后就会进入Runnable或者Blocked状态。
  2. Runnable: 线程处于可运行的状态,线程接受线程调度的控制,可能处于正在运行的状态,也可能没有在运行。
  3. Blocked: 线程可以运行,但是运行被阻止。线程调度会跳过该状态下的线程,直到线程返回Runnable状态。
  4. Dead: 线程的任务已经运行结束,如run()方法返回,或者被打断。此时线程不再接受调度。

2.2. Becoming Blocked

  以下原因会导致线程进入Blocked状态:

  • 调用sleep(),使线程休眠指定的时间;
  • 调用wait(),使线程被挂起,直到notify()notifyAll()(或者signal()signalAll());
  • 任务在等待I/O操作结束;
  • 任务调用其他对象上的同步方法,但该对象的锁被其他任务占用,暂时无法获得。

  在旧代码中可能还会使用suspend()resume()来阻塞和继续线程,但现在已被废弃。stop()在结束线程时,不会释放线程已经获得的对象的锁定,也已经被废弃。

3. Interruption

  抛出异常会打断run()的执行。Thread类提供了interrupt()方法来打断线程上任务的执行,即便线程处于阻塞状态。interrupt()会设置线程的interrupted标志。设置了interrupted标志的线程,如果已经处于阻塞状态,或者将要执行会导致阻塞的操作时,会抛出InterruptedException异常。异常抛出后,interrupted标志会被复位。

  interrupt()Thread的方法,但Executors避免了对Thread对象的直接操作。ExecutorshutdownNow()会调用该Executor启动的所有线程的interrupt()方法。如果只是希望中断Executor中的特定任务,就不能使用execute()来直接开始任务,而是要使用submit()来获得一个代表了任务上下文的Future<?>Future<?>cancel()会调用对应线程的interrupt()

import java.util.concurrent.*;
import java.io.*;

class SleepBlocked implements Runnable {
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(100);
        } catch(InterruptedException e) {
            System.out.println("InterruptedException");
        }
        System.out.println("Exiting SleepBlocked.run()");
    }
}

class IOBlocked implements Runnable {
    private InputStream in;
    public IOBlocked(InputStream is) { in = is; }
    public void run() {
        try {
            System.out.println("Waiting for read():");
            in.read();
        } catch(IOException e) {
            System.out.println("IOException");
            if(Thread.currentThread().isInterrupted()) {
                System.out.println("Interrupted from blocked I/O");
            } else {
                throw new RuntimeException(e);
            }
        }
        System.out.println("Exiting IOBlocked.run()");
    }
}

class SynchronizedBlocked implements Runnable {
    public synchronized void f() {
        while(true) // Never releases lock
            Thread.yield();
    }
    public SynchronizedBlocked() {
        new Thread() {
            public void run() {
                f(); // Lock acquired by this thread
            }
        }.start();
    }
    public void run() {
        System.out.println("Trying to call f()");
        f();
        System.out.println("Exiting SynchronizedBlocked.run()");
    }
}

public class Interrupting {
    private static ExecutorService exec =
            Executors.newCachedThreadPool();
    static void test(Runnable r) throws InterruptedException{
        System.out.println("--------------------------");
        Future<?> f = exec.submit(r);
        TimeUnit.MILLISECONDS.sleep(100);
        System.out.println("Interrupting " + r.getClass().getName());
        f.cancel(true); // Interrupts if running
        System.out.println("Interrupt sent to " + r.getClass().getName());
    }
    public static void main(String[] args) throws Exception {
        test(new SleepBlocked());
        test(new IOBlocked(System.in));
        test(new SynchronizedBlocked());
        TimeUnit.SECONDS.sleep(3);
        System.out.println("Aborting with System.exit(0)");
        System.exit(0); // ... since last 2 interrupts failed
    }
}
/* Output
--------------------------
Interrupting com.terminatingtasks.SleepBlocked
Interrupt sent to com.terminatingtasks.SleepBlocked
InterruptedException
Exiting SleepBlocked.run()
--------------------------
Waiting for read():
Interrupting com.terminatingtasks.IOBlocked
Interrupt sent to com.terminatingtasks.IOBlocked
--------------------------
Trying to call f()
Interrupting com.terminatingtasks.SynchronizedBlocked
Interrupt sent to com.terminatingtasks.SynchronizedBlocked
Aborting with System.exit(0)
*/

上面的例子中,Interruptingtest()使用Future<?> f = exec.submit(r),通过submit()提交任务给ExecutorService,并获取返回的Future<?>。在任务运行一段时间后,调用f.cancel(true)中断任务。SleepBlocked使用sleep()产生阻塞,被中断时可以捕获到InterruptedException异常,然后任务被中断。IOBlocked通过等待I/O输入产生阻塞,SynchronizedBlocked通过同步机制产生阻塞,二者不能被中断。

## 3.1. Blocked by I/O

  中断I/O产生的阻塞的一种方法是关闭产生阻塞的资源。

import java.net.*;
import java.util.concurrent.*;
import java.io.*;

public class CloseResource {
    public static void main(String[] args) throws Exception {
        ExecutorService exec = Executors.newCachedThreadPool();

        ServerSocket server = new ServerSocket(8080);
        InputStream socketInput = new Socket("localhost", 8080).getInputStream();
        exec.execute(new IOBlocked(socketInput));
        exec.execute(new IOBlocked(System.in));

        TimeUnit.MILLISECONDS.sleep(100);

        System.out.println("Shutting down all threads");
        exec.shutdownNow();

        TimeUnit.SECONDS.sleep(1);

        System.out.println("Closing " + socketInput.getClass().getName());
        socketInput.close(); // Releases blocked thread

        TimeUnit.SECONDS.sleep(1);

        System.out.println("Closing " + System.in.getClass().getName());
        System.in.close(); // Releases blocked thread
    }
}
/* Output
Waiting for read():
Waiting for read():
Shutting down all threads
Closing java.net.SocketInputStream
Caught IOException
Interrupted from blocked I/O
Exiting IOBlocked.run()
Closing java.io.BufferedInputStream

Caught IOException
Interrupted from blocked I/O
Exiting IOBlocked.run()
*/

这里使用了ServerSocketInputStream两种InputStreamexec.shutdownNow()并不能中断二者产生的阻塞。通过socketInput.close()System.in.close()关闭InputStream后,对应的任务捕获到IOException异常,之后可以检查到interruptedtrue,说明interrupt()已经生效。

  注意上面的输入来自Thinking in Java原书。在我的电脑上(Windows 10 x64 + JDK 8),上面的例子只有socketInput对应的任务能够在socketInput.close()后检查到interruptedtrue;在System.in.close()后,System.in对应的任务仍处于阻塞状态,当在控制台进行任意输入并回车后,System.in对应的任务被中断,且也能检查到interruptedtrue,如上面例子后的Output所示。

  使用nio相关类可以更加优雅地打断I/O造成的阻塞:

import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.util.concurrent.*;
import java.io.*;

class NIOBlocked implements Runnable {
    private final SocketChannel sc;
    public NIOBlocked(SocketChannel sc) { this.sc = sc; }
    public void run() {
        try {
            System.out.println("Waiting for read() in " + this);
            sc.read(ByteBuffer.allocate(1));
        } catch(ClosedByInterruptException e) {
            System.out.println("ClosedByInterruptException");
        } catch(AsynchronousCloseException e) {
            System.out.println("AsynchronousCloseException");
        } catch(IOException e) {
            throw new RuntimeException(e);
        }
        System.out.println("Exiting NIOBlocked.run() " + this);
    }
}

public class NIOInterruption {
    public static void main(String[] args) throws Exception {
        ExecutorService exec = Executors.newCachedThreadPool();
        ServerSocket server = new ServerSocket(8080);
        InetSocketAddress isa =
                new InetSocketAddress("localhost", 8080);
        SocketChannel sc1 = SocketChannel.open(isa);
        SocketChannel sc2 = SocketChannel.open(isa);
        Future<?> f = exec.submit(new NIOBlocked(sc1));
        exec.execute(new NIOBlocked(sc2));
        exec.shutdown();
        TimeUnit.SECONDS.sleep(1);
        // Produce an interrupt via cancel:
        f.cancel(true);
        TimeUnit.SECONDS.sleep(1);
        // Release the block by closing the channel:
        sc2.close();
    }
}
/* Output
Waiting for read() in com.terminatingtasks.NIOBlocked@503c95b4
Waiting for read() in com.terminatingtasks.NIOBlocked@5efdd578
ClosedByInterruptException
Exiting NIOBlocked.run() com.terminatingtasks.NIOBlocked@503c95b4
AsynchronousCloseException
Exiting NIOBlocked.run() com.terminatingtasks.NIOBlocked@5efdd578
*/

可见通过f.cancel(true)中断任务会捕获到ClosedByInterruptException,通过sc2.close()中断任务会捕获到AsynchronousCloseException,两种方法都可以成功地中断任务。

3.2. Blocked by a Mutex

  如果一个任务调用已经被锁定的对象上的方法,该任务就会被阻塞,直到它成功获得对象的锁定。同一个任务可以反复获得同一个对象的锁定。

public class MultiLock {
    public synchronized void f1(int count) {
        if(count-- > 0) {
            System.out.println("f1() calling f2() with count " + count);
            f2(count);
        }
    }
    public synchronized void f2(int count) {
        if(count-- > 0) {
            System.out.println("f2() calling f1() with count " + count);
            f1(count);
        }
    }
    public static void main(String[] args) throws Exception {
        final MultiLock multiLock = new MultiLock();
        new Thread() {
            public void run() {
                multiLock.f1(10);
            }
        }.start();
    }
}
/* Output
f1() calling f2() with count 9
f2() calling f1() with count 8
f1() calling f2() with count 7
f2() calling f1() with count 6
f1() calling f2() with count 5
f2() calling f1() with count 4
f1() calling f2() with count 3
f2() calling f1() with count 2
f1() calling f2() with count 1
f2() calling f1() with count 0
*/

f1()首先获得了锁定,然后调用synchronized方法,f2()可以顺利执行、不会被阻塞。f1()f2()互相调用,也不会发生阻塞。

  Java SE 5提供的ReentrantLocks可以被打断。

import java.util.concurrent.*;
import java.util.concurrent.locks.*;

class BlockedMutex {
    private Lock lock = new ReentrantLock();
    public BlockedMutex() {
        // Acquire it right away, to demonstrate interruption
        // of a task blocked on a ReentrantLock:
        lock.lock();
    }
    public void f() {
        try {
            // This will never be available to a second task
            lock.lockInterruptibly(); // Special call
            System.out.println("lock acquired in f()");
        } catch(InterruptedException e) {
            System.out.println("Interrupted from lock acquisition in f()");
        }
    }
}

class Blocked2 implements Runnable {
    BlockedMutex blocked = new BlockedMutex();
    public void run() {
        System.out.println("Waiting for f() in BlockedMutex");
        blocked.f();
        System.out.println("Broken out of blocked call");
    }
}

public class Interrupting2 {
    public static void main(String[] args) throws Exception {
        Thread t = new Thread(new Blocked2());
        t.start();
        TimeUnit.SECONDS.sleep(1);
        System.out.println("Issuing t.interrupt()");
        t.interrupt();
    }
}
/* Output
Waiting for f() in BlockedMutex
Issuing t.interrupt()
Interrupted from lock acquisition in f()
Broken out of blocked call
*/

BlockedMutex在创建时就对其自己的lock进行了锁定,Blocked2blocked.f()被阻塞,t.interrupt()可以成功消除阻塞,Blocked2得以继续运行至结束。

4. Checking for an Interrupt

  当调用interrupt()时,中断只会在任务运行至会产生阻塞的操作或任务已经被阻塞时发生,如果任务中没有会产生阻塞的操作,就无法被interrupt()打断。

  在线程上调用interrupt()会为线程设置一个被打断的标志位,该标志位可以通过Thread.interrupted()来查询;调用Thread.interrupted()后,该标志位会被自动清除。

  下面的例子展示了当run()中可能不存在阻塞操作时,使用Thread.interrupted()判断是否应当打断任务的方法:

import java.util.concurrent.*;

class NeedsCleanup {
    private final int id;
    public NeedsCleanup(int ident) {
        id = ident;
        System.out.println("NeedsCleanup " + id);
    }
    public void cleanup() {
        System.out.println("Cleaning up " + id);
    }
}

class Blocked3 implements Runnable {
    private volatile double d = 0.0;
    public void run() {
        try {
            while(!Thread.interrupted()) {
                // point1
                NeedsCleanup n1 = new NeedsCleanup(1);
                // Start try-finally immediately after definition
                // of n1, to guarantee proper cleanup of n1:
                try {
                    System.out.println("Sleeping");
                    TimeUnit.SECONDS.sleep(1);
                    // point2
                    NeedsCleanup n2 = new NeedsCleanup(2);
                    // Guarantee proper cleanup of n2:
                    try {
                        System.out.println("Calculating");
                        // A time-consuming, non-blocking operation:
                        for(int i = 1; i < 2500000; i++)
                            d = d + (Math.PI + Math.E) / d;
                        System.out.println("Finished time-consuming operation");
                    } finally {
                        n2.cleanup();
                    }
                } finally {
                    n1.cleanup();
                }
            }
            System.out.println("Exiting via while() test");
        } catch(InterruptedException e) {
            System.out.println("Exiting via InterruptedException");
        }
    }
}

public class InterruptingIdiom {
    public static void main(String[] args) throws Exception {
        if (args.length != 1) {
            System.out.println("usage: java InterruptingIdiom delay-in-mS");
            System.exit(1);
        }
        Thread t = new Thread(new Blocked3());
        t.start();
        TimeUnit.MILLISECONDS.sleep(new Integer(args[0]));
        t.interrupt();
    }
}
/* Output
NeedsCleanup 1 
Sleeping 
NeedsCleanup 2 
Calculating 
Finished time-consuming operation 
Cleaning up 2 
Cleaning up 1 
NeedsCleanup 1 
Sleeping 
Cleaning up 1 
Exiting via InterruptedException 
*/

运行这个例子必须要使用命令行指定打断任务的延时,上面的Output只是给出了一种可能的情况,

需要特别注意通过异常中断任务后的收尾工作,Blocked3run()NeedsCleanup n1 = new NeedsCleanup(1)实例化n1后紧跟try-finally,并在finally中清理n1