Concurrency: Terminating tasks
Author: nex3z
2016-07-04
Terminating tasks
1. The Ornamental Garden
下面的例子使用标志canceled
主动停止任务:
import java.util.concurrent.*;
private Random rand = new Random(47);
// Remove the synchronized keyword to see counting fail:
public synchronized int increment() {
if(rand.nextBoolean()) // Yield half the time
public synchronized int value() { return count; }
class Entrance implements Runnable {
private static Count count = new Count();
private static List<Entrance> entrances =
new ArrayList<Entrance>();
// Doesn’t need synchronization to read:
private static volatile boolean canceled = false;
// Atomic operation on a volatile field:
public static void cancel() { canceled = true; }
public Entrance(int id) {
// Keep this task in a list. Also prevents
// garbage collection of dead tasks:
System.out.println(this + " Total: " + count.increment());
TimeUnit.MILLISECONDS.sleep(100);
} catch(InterruptedException e) {
System.out.println("sleep interrupted");
System.out.println("Stopping " + this);
public synchronized int getValue() { return number; }
public String toString() {
return "Entrance " + id + ": " + getValue();
public static int getTotalCount() {
public static int sumEntrances() {
for(Entrance entrance : entrances)
sum += entrance.getValue();
public class OrnamentalGarden {
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
for(int i = 0; i < 5; i++)
exec.execute(new Entrance(i));
// Run for a while, then stop and collect the data:
TimeUnit.SECONDS.sleep(3);
if(!exec.awaitTermination(250, TimeUnit.MILLISECONDS))
System.out.println("Some tasks were not terminated!");
System.out.println("Total: " + Entrance.getTotalCount());
System.out.println("Sum of Entrances: " + Entrance.sumEntrances());
import java.util.concurrent.*;
import java.util.*;
class Count {
private int count = 0;
private Random rand = new Random(47);
// Remove the synchronized keyword to see counting fail:
public synchronized int increment() {
int temp = count;
if(rand.nextBoolean()) // Yield half the time
Thread.yield();
return (count = ++temp);
}
public synchronized int value() { return count; }
}
class Entrance implements Runnable {
private static Count count = new Count();
private static List<Entrance> entrances =
new ArrayList<Entrance>();
private int number = 0;
// Doesn’t need synchronization to read:
private final int id;
private static volatile boolean canceled = false;
// Atomic operation on a volatile field:
public static void cancel() { canceled = true; }
public Entrance(int id) {
this.id = id;
// Keep this task in a list. Also prevents
// garbage collection of dead tasks:
entrances.add(this);
}
public void run() {
while(!canceled) {
synchronized(this) {
++number;
}
System.out.println(this + " Total: " + count.increment());
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch(InterruptedException e) {
System.out.println("sleep interrupted");
}
}
System.out.println("Stopping " + this);
}
public synchronized int getValue() { return number; }
public String toString() {
return "Entrance " + id + ": " + getValue();
}
public static int getTotalCount() {
return count.value();
}
public static int sumEntrances() {
int sum = 0;
for(Entrance entrance : entrances)
sum += entrance.getValue();
return sum;
}
}
public class OrnamentalGarden {
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
for(int i = 0; i < 5; i++)
exec.execute(new Entrance(i));
// Run for a while, then stop and collect the data:
TimeUnit.SECONDS.sleep(3);
Entrance.cancel();
exec.shutdown();
if(!exec.awaitTermination(250, TimeUnit.MILLISECONDS))
System.out.println("Some tasks were not terminated!");
System.out.println("Total: " + Entrance.getTotalCount());
System.out.println("Sum of Entrances: " + Entrance.sumEntrances());
}
}
import java.util.concurrent.*;
import java.util.*;
class Count {
private int count = 0;
private Random rand = new Random(47);
// Remove the synchronized keyword to see counting fail:
public synchronized int increment() {
int temp = count;
if(rand.nextBoolean()) // Yield half the time
Thread.yield();
return (count = ++temp);
}
public synchronized int value() { return count; }
}
class Entrance implements Runnable {
private static Count count = new Count();
private static List<Entrance> entrances =
new ArrayList<Entrance>();
private int number = 0;
// Doesn’t need synchronization to read:
private final int id;
private static volatile boolean canceled = false;
// Atomic operation on a volatile field:
public static void cancel() { canceled = true; }
public Entrance(int id) {
this.id = id;
// Keep this task in a list. Also prevents
// garbage collection of dead tasks:
entrances.add(this);
}
public void run() {
while(!canceled) {
synchronized(this) {
++number;
}
System.out.println(this + " Total: " + count.increment());
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch(InterruptedException e) {
System.out.println("sleep interrupted");
}
}
System.out.println("Stopping " + this);
}
public synchronized int getValue() { return number; }
public String toString() {
return "Entrance " + id + ": " + getValue();
}
public static int getTotalCount() {
return count.value();
}
public static int sumEntrances() {
int sum = 0;
for(Entrance entrance : entrances)
sum += entrance.getValue();
return sum;
}
}
public class OrnamentalGarden {
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
for(int i = 0; i < 5; i++)
exec.execute(new Entrance(i));
// Run for a while, then stop and collect the data:
TimeUnit.SECONDS.sleep(3);
Entrance.cancel();
exec.shutdown();
if(!exec.awaitTermination(250, TimeUnit.MILLISECONDS))
System.out.println("Some tasks were not terminated!");
System.out.println("Total: " + Entrance.getTotalCount());
System.out.println("Sum of Entrances: " + Entrance.sumEntrances());
}
}
Entrance
的cancel()
会设置canceled
为true
,之后run()
中的循环(while(!canceled)
)就会停止,任务结束。这里对canceled
的操作只有赋值和读取,都是原子操作,不会被打断,所以只需把canceled
设置为volatile
保证可见性,不需要使用同步。
在OrnamentalGarden
的main()
中结束任务时,首先使用Entrance.cancel()
让各任务停止循环,结束各自的run()
。然后调用了exec.shutdown()
和exec.awaitTermination()
:exec.awaitTermination()
用于在一段时间内等待各个任务完成,如果任务能够在时限内(这里是250毫秒)完成,则返回true
;否则返回false
,表示还有任务没有完成。
这里所有的Entrance
的实例都被添加到了一个静态的列表List<Entrance> entrances
中,所以在各个Entrance
任务完成之后,各Entrance
实例不会被回收,还可以通过entrances
来进行访问。
2. Terminating When Blocked
上面的例子中,任务在执行的过程中检查标志位,主动结束任务。如果当前线程被sleep()
等操作阻塞,就需要使用其他方法来结束任务。
2.1. Thread states
线程有以下四种状态:
- New: 线程在被创建时才会短暂地处于这一状态,进行系统资源分配和初始化,之后就会进入Runnable或者Blocked状态。
- Runnable: 线程处于可运行的状态,线程接受线程调度的控制,可能处于正在运行的状态,也可能没有在运行。
- Blocked: 线程可以运行,但是运行被阻止。线程调度会跳过该状态下的线程,直到线程返回Runnable状态。
- Dead: 线程的任务已经运行结束,如
run()
方法返回,或者被打断。此时线程不再接受调度。
2.2. Becoming Blocked
以下原因会导致线程进入Blocked状态:
- 调用
sleep()
,使线程休眠指定的时间;
- 调用
wait()
,使线程被挂起,直到notify()
或notifyAll()
(或者signal()
、signalAll()
);
- 任务在等待I/O操作结束;
- 任务调用其他对象上的同步方法,但该对象的锁被其他任务占用,暂时无法获得。
在旧代码中可能还会使用suspend()
和resume()
来阻塞和继续线程,但现在已被废弃。stop()
在结束线程时,不会释放线程已经获得的对象的锁定,也已经被废弃。
3. Interruption
抛出异常会打断run()
的执行。Thread
类提供了interrupt()
方法来打断线程上任务的执行,即便线程处于阻塞状态。interrupt()
会设置线程的interrupted
标志。设置了interrupted
标志的线程,如果已经处于阻塞状态,或者将要执行会导致阻塞的操作时,会抛出InterruptedException
异常。异常抛出后,interrupted
标志会被复位。
interrupt()
是Thread
的方法,但Executors
避免了对Thread
对象的直接操作。Executor
的shutdownNow()
会调用该Executor
启动的所有线程的interrupt()
方法。如果只是希望中断Executor
中的特定任务,就不能使用execute()
来直接开始任务,而是要使用submit()
来获得一个代表了任务上下文的Future<?>
,Future<?>
的cancel()
会调用对应线程的interrupt()
。
import java.util.concurrent.*;
class SleepBlocked implements Runnable {
TimeUnit.SECONDS.sleep(100);
} catch(InterruptedException e) {
System.out.println("InterruptedException");
System.out.println("Exiting SleepBlocked.run()");
class IOBlocked implements Runnable {
public IOBlocked(InputStream is) { in = is; }
System.out.println("Waiting for read():");
System.out.println("IOException");
if(Thread.currentThread().isInterrupted()) {
System.out.println("Interrupted from blocked I/O");
throw new RuntimeException(e);
System.out.println("Exiting IOBlocked.run()");
class SynchronizedBlocked implements Runnable {
public synchronized void f() {
while(true) // Never releases lock
public SynchronizedBlocked() {
f(); // Lock acquired by this thread
System.out.println("Trying to call f()");
System.out.println("Exiting SynchronizedBlocked.run()");
public class Interrupting {
private static ExecutorService exec =
Executors.newCachedThreadPool();
static void test(Runnable r) throws InterruptedException{
System.out.println("--------------------------");
Future<?> f = exec.submit(r);
TimeUnit.MILLISECONDS.sleep(100);
System.out.println("Interrupting " + r.getClass().getName());
f.cancel(true); // Interrupts if running
System.out.println("Interrupt sent to " + r.getClass().getName());
public static void main(String[] args) throws Exception {
test(new SleepBlocked());
test(new IOBlocked(System.in));
test(new SynchronizedBlocked());
TimeUnit.SECONDS.sleep(3);
System.out.println("Aborting with System.exit(0)");
System.exit(0); // ... since last 2 interrupts failed
--------------------------
Interrupting com.terminatingtasks.SleepBlocked
Interrupt sent to com.terminatingtasks.SleepBlocked
Exiting SleepBlocked.run()
--------------------------
Interrupting com.terminatingtasks.IOBlocked
Interrupt sent to com.terminatingtasks.IOBlocked
--------------------------
Interrupting com.terminatingtasks.SynchronizedBlocked
Interrupt sent to com.terminatingtasks.SynchronizedBlocked
Aborting with System.exit(0)
import java.util.concurrent.*;
import java.io.*;
class SleepBlocked implements Runnable {
public void run() {
try {
TimeUnit.SECONDS.sleep(100);
} catch(InterruptedException e) {
System.out.println("InterruptedException");
}
System.out.println("Exiting SleepBlocked.run()");
}
}
class IOBlocked implements Runnable {
private InputStream in;
public IOBlocked(InputStream is) { in = is; }
public void run() {
try {
System.out.println("Waiting for read():");
in.read();
} catch(IOException e) {
System.out.println("IOException");
if(Thread.currentThread().isInterrupted()) {
System.out.println("Interrupted from blocked I/O");
} else {
throw new RuntimeException(e);
}
}
System.out.println("Exiting IOBlocked.run()");
}
}
class SynchronizedBlocked implements Runnable {
public synchronized void f() {
while(true) // Never releases lock
Thread.yield();
}
public SynchronizedBlocked() {
new Thread() {
public void run() {
f(); // Lock acquired by this thread
}
}.start();
}
public void run() {
System.out.println("Trying to call f()");
f();
System.out.println("Exiting SynchronizedBlocked.run()");
}
}
public class Interrupting {
private static ExecutorService exec =
Executors.newCachedThreadPool();
static void test(Runnable r) throws InterruptedException{
System.out.println("--------------------------");
Future<?> f = exec.submit(r);
TimeUnit.MILLISECONDS.sleep(100);
System.out.println("Interrupting " + r.getClass().getName());
f.cancel(true); // Interrupts if running
System.out.println("Interrupt sent to " + r.getClass().getName());
}
public static void main(String[] args) throws Exception {
test(new SleepBlocked());
test(new IOBlocked(System.in));
test(new SynchronizedBlocked());
TimeUnit.SECONDS.sleep(3);
System.out.println("Aborting with System.exit(0)");
System.exit(0); // ... since last 2 interrupts failed
}
}
/* Output
--------------------------
Interrupting com.terminatingtasks.SleepBlocked
Interrupt sent to com.terminatingtasks.SleepBlocked
InterruptedException
Exiting SleepBlocked.run()
--------------------------
Waiting for read():
Interrupting com.terminatingtasks.IOBlocked
Interrupt sent to com.terminatingtasks.IOBlocked
--------------------------
Trying to call f()
Interrupting com.terminatingtasks.SynchronizedBlocked
Interrupt sent to com.terminatingtasks.SynchronizedBlocked
Aborting with System.exit(0)
*/
import java.util.concurrent.*;
import java.io.*;
class SleepBlocked implements Runnable {
public void run() {
try {
TimeUnit.SECONDS.sleep(100);
} catch(InterruptedException e) {
System.out.println("InterruptedException");
}
System.out.println("Exiting SleepBlocked.run()");
}
}
class IOBlocked implements Runnable {
private InputStream in;
public IOBlocked(InputStream is) { in = is; }
public void run() {
try {
System.out.println("Waiting for read():");
in.read();
} catch(IOException e) {
System.out.println("IOException");
if(Thread.currentThread().isInterrupted()) {
System.out.println("Interrupted from blocked I/O");
} else {
throw new RuntimeException(e);
}
}
System.out.println("Exiting IOBlocked.run()");
}
}
class SynchronizedBlocked implements Runnable {
public synchronized void f() {
while(true) // Never releases lock
Thread.yield();
}
public SynchronizedBlocked() {
new Thread() {
public void run() {
f(); // Lock acquired by this thread
}
}.start();
}
public void run() {
System.out.println("Trying to call f()");
f();
System.out.println("Exiting SynchronizedBlocked.run()");
}
}
public class Interrupting {
private static ExecutorService exec =
Executors.newCachedThreadPool();
static void test(Runnable r) throws InterruptedException{
System.out.println("--------------------------");
Future<?> f = exec.submit(r);
TimeUnit.MILLISECONDS.sleep(100);
System.out.println("Interrupting " + r.getClass().getName());
f.cancel(true); // Interrupts if running
System.out.println("Interrupt sent to " + r.getClass().getName());
}
public static void main(String[] args) throws Exception {
test(new SleepBlocked());
test(new IOBlocked(System.in));
test(new SynchronizedBlocked());
TimeUnit.SECONDS.sleep(3);
System.out.println("Aborting with System.exit(0)");
System.exit(0); // ... since last 2 interrupts failed
}
}
/* Output
--------------------------
Interrupting com.terminatingtasks.SleepBlocked
Interrupt sent to com.terminatingtasks.SleepBlocked
InterruptedException
Exiting SleepBlocked.run()
--------------------------
Waiting for read():
Interrupting com.terminatingtasks.IOBlocked
Interrupt sent to com.terminatingtasks.IOBlocked
--------------------------
Trying to call f()
Interrupting com.terminatingtasks.SynchronizedBlocked
Interrupt sent to com.terminatingtasks.SynchronizedBlocked
Aborting with System.exit(0)
*/
上面的例子中,
Interrupting
Interrupting
的
test()
test()
使用
Future<?> f = exec.submit(r)
Future<?> f = exec.submit(r)
,通过
submit()
submit()
提交任务给
ExecutorService
ExecutorService
,并获取返回的
Future<?>
Future<?>
。在任务运行一段时间后,调用
f.cancel(true)
f.cancel(true)
中断任务。
SleepBlocked
SleepBlocked
使用
sleep()
sleep()
产生阻塞,被中断时可以捕获到
InterruptedException
InterruptedException
异常,然后任务被中断。
IOBlocked
IOBlocked
通过等待I/O输入产生阻塞,
SynchronizedBlocked
SynchronizedBlocked
通过同步机制产生阻塞,二者不能被中断。
## 3.1. Blocked by I/O
中断I/O产生的阻塞的一种方法是关闭产生阻塞的资源。
import java.util.concurrent.*;
public class CloseResource {
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
ServerSocket server = new ServerSocket(8080);
InputStream socketInput = new Socket("localhost", 8080).getInputStream();
exec.execute(new IOBlocked(socketInput));
exec.execute(new IOBlocked(System.in));
TimeUnit.MILLISECONDS.sleep(100);
System.out.println("Shutting down all threads");
TimeUnit.SECONDS.sleep(1);
System.out.println("Closing " + socketInput.getClass().getName());
socketInput.close(); // Releases blocked thread
TimeUnit.SECONDS.sleep(1);
System.out.println("Closing " + System.in.getClass().getName());
System.in.close(); // Releases blocked thread
Shutting down all threads
Closing java.net.SocketInputStream
Interrupted from blocked I/O
Closing java.io.BufferedInputStream
Interrupted from blocked I/O
import java.net.*;
import java.util.concurrent.*;
import java.io.*;
public class CloseResource {
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
ServerSocket server = new ServerSocket(8080);
InputStream socketInput = new Socket("localhost", 8080).getInputStream();
exec.execute(new IOBlocked(socketInput));
exec.execute(new IOBlocked(System.in));
TimeUnit.MILLISECONDS.sleep(100);
System.out.println("Shutting down all threads");
exec.shutdownNow();
TimeUnit.SECONDS.sleep(1);
System.out.println("Closing " + socketInput.getClass().getName());
socketInput.close(); // Releases blocked thread
TimeUnit.SECONDS.sleep(1);
System.out.println("Closing " + System.in.getClass().getName());
System.in.close(); // Releases blocked thread
}
}
/* Output
Waiting for read():
Waiting for read():
Shutting down all threads
Closing java.net.SocketInputStream
Caught IOException
Interrupted from blocked I/O
Exiting IOBlocked.run()
Closing java.io.BufferedInputStream
Caught IOException
Interrupted from blocked I/O
Exiting IOBlocked.run()
*/
import java.net.*;
import java.util.concurrent.*;
import java.io.*;
public class CloseResource {
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
ServerSocket server = new ServerSocket(8080);
InputStream socketInput = new Socket("localhost", 8080).getInputStream();
exec.execute(new IOBlocked(socketInput));
exec.execute(new IOBlocked(System.in));
TimeUnit.MILLISECONDS.sleep(100);
System.out.println("Shutting down all threads");
exec.shutdownNow();
TimeUnit.SECONDS.sleep(1);
System.out.println("Closing " + socketInput.getClass().getName());
socketInput.close(); // Releases blocked thread
TimeUnit.SECONDS.sleep(1);
System.out.println("Closing " + System.in.getClass().getName());
System.in.close(); // Releases blocked thread
}
}
/* Output
Waiting for read():
Waiting for read():
Shutting down all threads
Closing java.net.SocketInputStream
Caught IOException
Interrupted from blocked I/O
Exiting IOBlocked.run()
Closing java.io.BufferedInputStream
Caught IOException
Interrupted from blocked I/O
Exiting IOBlocked.run()
*/
这里使用了
ServerSocket
ServerSocket
和
InputStream
InputStream
两种
InputStream
InputStream
,
exec.shutdownNow()
exec.shutdownNow()
并不能中断二者产生的阻塞。通过
socketInput.close()
socketInput.close()
和
System.in.close()
System.in.close()
关闭
InputStream
InputStream
后,对应的任务捕获到
IOException
IOException
异常,之后可以检查到
interrupted
interrupted
为
true
true
,说明
interrupt()
interrupt()
已经生效。
注意上面的输入来自Thinking in Java原书。在我的电脑上(Windows 10 x64 + JDK 8),上面的例子只有
socketInput
socketInput
对应的任务能够在
socketInput.close()
socketInput.close()
后检查到
interrupted
interrupted
为
true
true
;在
System.in.close()
System.in.close()
后,
System.in
System.in
对应的任务仍处于阻塞状态,当在控制台进行任意输入并回车后,
System.in
System.in
对应的任务被中断,且也能检查到
interrupted
interrupted
为
true
true
,如上面例子后的
Output
Output
所示。
使用
nio
nio
相关类可以更加优雅地打断I/O造成的阻塞:
import java.nio.channels.*;
import java.util.concurrent.*;
class NIOBlocked implements Runnable {
private final SocketChannel sc;
public NIOBlocked(SocketChannel sc) { this.sc = sc; }
System.out.println("Waiting for read() in " + this);
sc.read(ByteBuffer.allocate(1));
} catch(ClosedByInterruptException e) {
System.out.println("ClosedByInterruptException");
} catch(AsynchronousCloseException e) {
System.out.println("AsynchronousCloseException");
throw new RuntimeException(e);
System.out.println("Exiting NIOBlocked.run() " + this);
public class NIOInterruption {
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
ServerSocket server = new ServerSocket(8080);
new InetSocketAddress("localhost", 8080);
SocketChannel sc1 = SocketChannel.open(isa);
SocketChannel sc2 = SocketChannel.open(isa);
Future<?> f = exec.submit(new NIOBlocked(sc1));
exec.execute(new NIOBlocked(sc2));
TimeUnit.SECONDS.sleep(1);
// Produce an interrupt via cancel:
TimeUnit.SECONDS.sleep(1);
// Release the block by closing the channel:
Waiting for read() in com.terminatingtasks.NIOBlocked@503c95b4
Waiting for read() in com.terminatingtasks.NIOBlocked@5efdd578
ClosedByInterruptException
Exiting NIOBlocked.run() com.terminatingtasks.NIOBlocked@503c95b4
AsynchronousCloseException
Exiting NIOBlocked.run() com.terminatingtasks.NIOBlocked@5efdd578
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.util.concurrent.*;
import java.io.*;
class NIOBlocked implements Runnable {
private final SocketChannel sc;
public NIOBlocked(SocketChannel sc) { this.sc = sc; }
public void run() {
try {
System.out.println("Waiting for read() in " + this);
sc.read(ByteBuffer.allocate(1));
} catch(ClosedByInterruptException e) {
System.out.println("ClosedByInterruptException");
} catch(AsynchronousCloseException e) {
System.out.println("AsynchronousCloseException");
} catch(IOException e) {
throw new RuntimeException(e);
}
System.out.println("Exiting NIOBlocked.run() " + this);
}
}
public class NIOInterruption {
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
ServerSocket server = new ServerSocket(8080);
InetSocketAddress isa =
new InetSocketAddress("localhost", 8080);
SocketChannel sc1 = SocketChannel.open(isa);
SocketChannel sc2 = SocketChannel.open(isa);
Future<?> f = exec.submit(new NIOBlocked(sc1));
exec.execute(new NIOBlocked(sc2));
exec.shutdown();
TimeUnit.SECONDS.sleep(1);
// Produce an interrupt via cancel:
f.cancel(true);
TimeUnit.SECONDS.sleep(1);
// Release the block by closing the channel:
sc2.close();
}
}
/* Output
Waiting for read() in com.terminatingtasks.NIOBlocked@503c95b4
Waiting for read() in com.terminatingtasks.NIOBlocked@5efdd578
ClosedByInterruptException
Exiting NIOBlocked.run() com.terminatingtasks.NIOBlocked@503c95b4
AsynchronousCloseException
Exiting NIOBlocked.run() com.terminatingtasks.NIOBlocked@5efdd578
*/
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.util.concurrent.*;
import java.io.*;
class NIOBlocked implements Runnable {
private final SocketChannel sc;
public NIOBlocked(SocketChannel sc) { this.sc = sc; }
public void run() {
try {
System.out.println("Waiting for read() in " + this);
sc.read(ByteBuffer.allocate(1));
} catch(ClosedByInterruptException e) {
System.out.println("ClosedByInterruptException");
} catch(AsynchronousCloseException e) {
System.out.println("AsynchronousCloseException");
} catch(IOException e) {
throw new RuntimeException(e);
}
System.out.println("Exiting NIOBlocked.run() " + this);
}
}
public class NIOInterruption {
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
ServerSocket server = new ServerSocket(8080);
InetSocketAddress isa =
new InetSocketAddress("localhost", 8080);
SocketChannel sc1 = SocketChannel.open(isa);
SocketChannel sc2 = SocketChannel.open(isa);
Future<?> f = exec.submit(new NIOBlocked(sc1));
exec.execute(new NIOBlocked(sc2));
exec.shutdown();
TimeUnit.SECONDS.sleep(1);
// Produce an interrupt via cancel:
f.cancel(true);
TimeUnit.SECONDS.sleep(1);
// Release the block by closing the channel:
sc2.close();
}
}
/* Output
Waiting for read() in com.terminatingtasks.NIOBlocked@503c95b4
Waiting for read() in com.terminatingtasks.NIOBlocked@5efdd578
ClosedByInterruptException
Exiting NIOBlocked.run() com.terminatingtasks.NIOBlocked@503c95b4
AsynchronousCloseException
Exiting NIOBlocked.run() com.terminatingtasks.NIOBlocked@5efdd578
*/
可见通过f.cancel(true)
中断任务会捕获到ClosedByInterruptException
,通过sc2.close()
中断任务会捕获到AsynchronousCloseException
,两种方法都可以成功地中断任务。
3.2. Blocked by a Mutex
如果一个任务调用已经被锁定的对象上的方法,该任务就会被阻塞,直到它成功获得对象的锁定。同一个任务可以反复获得同一个对象的锁定。
public synchronized void f1(int count) {
System.out.println("f1() calling f2() with count " + count);
public synchronized void f2(int count) {
System.out.println("f2() calling f1() with count " + count);
public static void main(String[] args) throws Exception {
final MultiLock multiLock = new MultiLock();
f1() calling f2() with count 9
f2() calling f1() with count 8
f1() calling f2() with count 7
f2() calling f1() with count 6
f1() calling f2() with count 5
f2() calling f1() with count 4
f1() calling f2() with count 3
f2() calling f1() with count 2
f1() calling f2() with count 1
f2() calling f1() with count 0
public class MultiLock {
public synchronized void f1(int count) {
if(count-- > 0) {
System.out.println("f1() calling f2() with count " + count);
f2(count);
}
}
public synchronized void f2(int count) {
if(count-- > 0) {
System.out.println("f2() calling f1() with count " + count);
f1(count);
}
}
public static void main(String[] args) throws Exception {
final MultiLock multiLock = new MultiLock();
new Thread() {
public void run() {
multiLock.f1(10);
}
}.start();
}
}
/* Output
f1() calling f2() with count 9
f2() calling f1() with count 8
f1() calling f2() with count 7
f2() calling f1() with count 6
f1() calling f2() with count 5
f2() calling f1() with count 4
f1() calling f2() with count 3
f2() calling f1() with count 2
f1() calling f2() with count 1
f2() calling f1() with count 0
*/
public class MultiLock {
public synchronized void f1(int count) {
if(count-- > 0) {
System.out.println("f1() calling f2() with count " + count);
f2(count);
}
}
public synchronized void f2(int count) {
if(count-- > 0) {
System.out.println("f2() calling f1() with count " + count);
f1(count);
}
}
public static void main(String[] args) throws Exception {
final MultiLock multiLock = new MultiLock();
new Thread() {
public void run() {
multiLock.f1(10);
}
}.start();
}
}
/* Output
f1() calling f2() with count 9
f2() calling f1() with count 8
f1() calling f2() with count 7
f2() calling f1() with count 6
f1() calling f2() with count 5
f2() calling f1() with count 4
f1() calling f2() with count 3
f2() calling f1() with count 2
f1() calling f2() with count 1
f2() calling f1() with count 0
*/
f1()
首先获得了锁定,然后调用synchronized
方法,f2()
可以顺利执行、不会被阻塞。f1()
和f2()
互相调用,也不会发生阻塞。
Java SE 5提供的ReentrantLocks
可以被打断。
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
private Lock lock = new ReentrantLock();
// Acquire it right away, to demonstrate interruption
// of a task blocked on a ReentrantLock:
// This will never be available to a second task
lock.lockInterruptibly(); // Special call
System.out.println("lock acquired in f()");
} catch(InterruptedException e) {
System.out.println("Interrupted from lock acquisition in f()");
class Blocked2 implements Runnable {
BlockedMutex blocked = new BlockedMutex();
System.out.println("Waiting for f() in BlockedMutex");
System.out.println("Broken out of blocked call");
public class Interrupting2 {
public static void main(String[] args) throws Exception {
Thread t = new Thread(new Blocked2());
TimeUnit.SECONDS.sleep(1);
System.out.println("Issuing t.interrupt()");
Waiting for f() in BlockedMutex
Interrupted from lock acquisition in f()
Broken out of blocked call
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
class BlockedMutex {
private Lock lock = new ReentrantLock();
public BlockedMutex() {
// Acquire it right away, to demonstrate interruption
// of a task blocked on a ReentrantLock:
lock.lock();
}
public void f() {
try {
// This will never be available to a second task
lock.lockInterruptibly(); // Special call
System.out.println("lock acquired in f()");
} catch(InterruptedException e) {
System.out.println("Interrupted from lock acquisition in f()");
}
}
}
class Blocked2 implements Runnable {
BlockedMutex blocked = new BlockedMutex();
public void run() {
System.out.println("Waiting for f() in BlockedMutex");
blocked.f();
System.out.println("Broken out of blocked call");
}
}
public class Interrupting2 {
public static void main(String[] args) throws Exception {
Thread t = new Thread(new Blocked2());
t.start();
TimeUnit.SECONDS.sleep(1);
System.out.println("Issuing t.interrupt()");
t.interrupt();
}
}
/* Output
Waiting for f() in BlockedMutex
Issuing t.interrupt()
Interrupted from lock acquisition in f()
Broken out of blocked call
*/
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
class BlockedMutex {
private Lock lock = new ReentrantLock();
public BlockedMutex() {
// Acquire it right away, to demonstrate interruption
// of a task blocked on a ReentrantLock:
lock.lock();
}
public void f() {
try {
// This will never be available to a second task
lock.lockInterruptibly(); // Special call
System.out.println("lock acquired in f()");
} catch(InterruptedException e) {
System.out.println("Interrupted from lock acquisition in f()");
}
}
}
class Blocked2 implements Runnable {
BlockedMutex blocked = new BlockedMutex();
public void run() {
System.out.println("Waiting for f() in BlockedMutex");
blocked.f();
System.out.println("Broken out of blocked call");
}
}
public class Interrupting2 {
public static void main(String[] args) throws Exception {
Thread t = new Thread(new Blocked2());
t.start();
TimeUnit.SECONDS.sleep(1);
System.out.println("Issuing t.interrupt()");
t.interrupt();
}
}
/* Output
Waiting for f() in BlockedMutex
Issuing t.interrupt()
Interrupted from lock acquisition in f()
Broken out of blocked call
*/
BlockedMutex
在创建时就对其自己的lock
进行了锁定,Blocked2
的blocked.f()
被阻塞,t.interrupt()
可以成功消除阻塞,Blocked2
得以继续运行至结束。
4. Checking for an Interrupt
当调用interrupt()
时,中断只会在任务运行至会产生阻塞的操作或任务已经被阻塞时发生,如果任务中没有会产生阻塞的操作,就无法被interrupt()
打断。
在线程上调用interrupt()
会为线程设置一个被打断的标志位,该标志位可以通过Thread.interrupted()
来查询;调用Thread.interrupted()
后,该标志位会被自动清除。
下面的例子展示了当run()
中可能不存在阻塞操作时,使用Thread.interrupted()
判断是否应当打断任务的方法:
import java.util.concurrent.*;
public NeedsCleanup(int ident) {
System.out.println("NeedsCleanup " + id);
System.out.println("Cleaning up " + id);
class Blocked3 implements Runnable {
private volatile double d = 0.0;
while(!Thread.interrupted()) {
NeedsCleanup n1 = new NeedsCleanup(1);
// Start try-finally immediately after definition
// of n1, to guarantee proper cleanup of n1:
System.out.println("Sleeping");
TimeUnit.SECONDS.sleep(1);
NeedsCleanup n2 = new NeedsCleanup(2);
// Guarantee proper cleanup of n2:
System.out.println("Calculating");
// A time-consuming, non-blocking operation:
for(int i = 1; i < 2500000; i++)
d = d + (Math.PI + Math.E) / d;
System.out.println("Finished time-consuming operation");
System.out.println("Exiting via while() test");
} catch(InterruptedException e) {
System.out.println("Exiting via InterruptedException");
public class InterruptingIdiom {
public static void main(String[] args) throws Exception {
System.out.println("usage: java InterruptingIdiom delay-in-mS");
Thread t = new Thread(new Blocked3());
TimeUnit.MILLISECONDS.sleep(new Integer(args[0]));
Finished time-consuming operation
Exiting via InterruptedException
import java.util.concurrent.*;
class NeedsCleanup {
private final int id;
public NeedsCleanup(int ident) {
id = ident;
System.out.println("NeedsCleanup " + id);
}
public void cleanup() {
System.out.println("Cleaning up " + id);
}
}
class Blocked3 implements Runnable {
private volatile double d = 0.0;
public void run() {
try {
while(!Thread.interrupted()) {
// point1
NeedsCleanup n1 = new NeedsCleanup(1);
// Start try-finally immediately after definition
// of n1, to guarantee proper cleanup of n1:
try {
System.out.println("Sleeping");
TimeUnit.SECONDS.sleep(1);
// point2
NeedsCleanup n2 = new NeedsCleanup(2);
// Guarantee proper cleanup of n2:
try {
System.out.println("Calculating");
// A time-consuming, non-blocking operation:
for(int i = 1; i < 2500000; i++)
d = d + (Math.PI + Math.E) / d;
System.out.println("Finished time-consuming operation");
} finally {
n2.cleanup();
}
} finally {
n1.cleanup();
}
}
System.out.println("Exiting via while() test");
} catch(InterruptedException e) {
System.out.println("Exiting via InterruptedException");
}
}
}
public class InterruptingIdiom {
public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.out.println("usage: java InterruptingIdiom delay-in-mS");
System.exit(1);
}
Thread t = new Thread(new Blocked3());
t.start();
TimeUnit.MILLISECONDS.sleep(new Integer(args[0]));
t.interrupt();
}
}
/* Output
NeedsCleanup 1
Sleeping
NeedsCleanup 2
Calculating
Finished time-consuming operation
Cleaning up 2
Cleaning up 1
NeedsCleanup 1
Sleeping
Cleaning up 1
Exiting via InterruptedException
*/
import java.util.concurrent.*;
class NeedsCleanup {
private final int id;
public NeedsCleanup(int ident) {
id = ident;
System.out.println("NeedsCleanup " + id);
}
public void cleanup() {
System.out.println("Cleaning up " + id);
}
}
class Blocked3 implements Runnable {
private volatile double d = 0.0;
public void run() {
try {
while(!Thread.interrupted()) {
// point1
NeedsCleanup n1 = new NeedsCleanup(1);
// Start try-finally immediately after definition
// of n1, to guarantee proper cleanup of n1:
try {
System.out.println("Sleeping");
TimeUnit.SECONDS.sleep(1);
// point2
NeedsCleanup n2 = new NeedsCleanup(2);
// Guarantee proper cleanup of n2:
try {
System.out.println("Calculating");
// A time-consuming, non-blocking operation:
for(int i = 1; i < 2500000; i++)
d = d + (Math.PI + Math.E) / d;
System.out.println("Finished time-consuming operation");
} finally {
n2.cleanup();
}
} finally {
n1.cleanup();
}
}
System.out.println("Exiting via while() test");
} catch(InterruptedException e) {
System.out.println("Exiting via InterruptedException");
}
}
}
public class InterruptingIdiom {
public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.out.println("usage: java InterruptingIdiom delay-in-mS");
System.exit(1);
}
Thread t = new Thread(new Blocked3());
t.start();
TimeUnit.MILLISECONDS.sleep(new Integer(args[0]));
t.interrupt();
}
}
/* Output
NeedsCleanup 1
Sleeping
NeedsCleanup 2
Calculating
Finished time-consuming operation
Cleaning up 2
Cleaning up 1
NeedsCleanup 1
Sleeping
Cleaning up 1
Exiting via InterruptedException
*/
运行这个例子必须要使用命令行指定打断任务的延时,上面的Output
只是给出了一种可能的情况,
需要特别注意通过异常中断任务后的收尾工作,Blocked3
的run()
在NeedsCleanup n1 = new NeedsCleanup(1)
实例化n1
后紧跟try-finally
,并在finally
中清理n1
。