Concurrency: Cooperation Between Tasks (2)
Contents
4. Producer-Consumers and Queues
java.util.concurrent.BlockingQueue接口定义了一种同步队列,同一时间只允许一个任务对队列进行插入和删除。BlockingQueue有多种标准实现,LinkedBlockingQueue提供不限长度的队列,ArrayBlockingQueue具有固定长度。如果队列为空,试图从该队列中获取元素的任务会被挂起,直到有新元素添加进队列。
class LiftOff implements Runnable {
protected int countDown = 10; // Default
private static int taskCount = 0;
private final int id = taskCount++;
public LiftOff() {}
public LiftOff(int countDown) {
this.countDown = countDown;
}
public String status() {
return "#" + id + "(" +
(countDown > 0 ? countDown : "Liftoff!") + "), ";
}
public void run() {
while(countDown-- > 0) {
System.out.print(status());
Thread.yield();
}
}
}
class LiftOffRunner implements Runnable {
private BlockingQueue<LiftOff> rockets;
public LiftOffRunner(BlockingQueue<LiftOff> queue) {
rockets = queue;
}
public void add(LiftOff lo) {
try {
rockets.put(lo);
} catch(InterruptedException e) {
System.out.println("Interrupted during put()");
}
}
public void run() {
try {
while(!Thread.interrupted()) {
LiftOff rocket = rockets.take();
rocket.run(); // Use this thread
}
} catch(InterruptedException e) {
System.out.println("Waking from take()");
}
System.out.println("Exiting LiftOffRunner");
}
}
public class TestBlockingQueues {
static void getkey() {
try {
// Compensate for Windows/Linux difference in the
// length of the result produced by the Enter key:
new BufferedReader(new InputStreamReader(System.in)).readLine();
} catch(java.io.IOException e) {
throw new RuntimeException(e);
}
}
static void getkey(String message) {
System.out.println(message);
getkey();
}
static void test(String msg, BlockingQueue<LiftOff> queue) {
System.out.println(msg);
LiftOffRunner runner = new LiftOffRunner(queue);
Thread t = new Thread(runner);
t.start();
for(int i = 0; i < 5; i++)
runner.add(new LiftOff(5));
getkey("Press ‘Enter’ (" + msg + ")");
t.interrupt();
System.out.println("Finished " + msg + " test");
}
public static void main(String[] args) {
test("LinkedBlockingQueue", // Unlimited size
new LinkedBlockingQueue<LiftOff>());
test("ArrayBlockingQueue", // Fixed size
new ArrayBlockingQueue<LiftOff>(3));
test("SynchronousQueue", // Size of 1
new SynchronousQueue<LiftOff>());
}
}
/*
LinkedBlockingQueue
Press ‘Enter’ (LinkedBlockingQueue)
#0(4), #0(3), #0(2), #0(1), #0(Liftoff!), #1(4), #1(3), #1(2), #1(1), #1(Liftoff!), #2(4), #2(3), #2(2), #2(1), #2(Liftoff!), #3(4), #3(3), #3(2), #3(1), #3(Liftoff!), #4(4), #4(3), #4(2), #4(1), #4(Liftoff!),
Finished LinkedBlockingQueue test
Waking from take()
Exiting LiftOffRunner
ArrayBlockingQueue
#5(4), #5(3), #5(2), #5(1), #5(Liftoff!), Press ‘Enter’ (ArrayBlockingQueue)
#6(4), #6(3), #6(2), #6(1), #6(Liftoff!), #7(4), #7(3), #7(2), #7(1), #7(Liftoff!), #8(4), #8(3), #8(2), #8(1), #8(Liftoff!), #9(4), #9(3), #9(2), #9(1), #9(Liftoff!),
Finished ArrayBlockingQueue test
SynchronousQueue
Waking from take()
Exiting LiftOffRunner
#10(4), #10(3), #10(2), #10(1), #10(Liftoff!), #11(4), #11(3), #11(2), #11(1), #11(Liftoff!), #12(4), #12(3), #12(2), #12(1), #12(Liftoff!), #13(4), #13(3), #13(2), #13(1), #13(Liftoff!), Press ‘Enter’ (SynchronousQueue)
#14(4), #14(3), #14(2), #14(1), #14(Liftoff!),
Finished SynchronousQueue test
Waking from take()
Exiting LiftOffRunner
*/
上面的例子在main()中分别向三种BlockingQueue中依次加入5个LiftOff,LiftOff被LiftOffRunner消耗。同步由BlockingQueue处理,LiftOffRunner不需要考虑同步问题,
4.1. BlockingQueues of Toast
下面是使用BlockingQueue的另一个例子:
package com.company;
import java.util.concurrent.*;
import java.util.*;
class Toast {
public enum Status { DRY, BUTTERED, JAMMED }
private Status status = Status.DRY;
private final int id;
public Toast(int idn) { id = idn; }
public void butter() { status = Status.BUTTERED; }
public void jam() { status = Status.JAMMED; }
public Status getStatus() { return status; }
public int getId() { return id; }
public String toString() {
return "Toast " + id + ": " + status;
}
}
class ToastQueue extends LinkedBlockingQueue<Toast> {}
class Toaster implements Runnable {
private ToastQueue toastQueue;
private int count = 0;
private Random rand = new Random(47);
public Toaster(ToastQueue tq) { toastQueue = tq; }
public void run() {
try {
while(!Thread.interrupted()) {
TimeUnit.MILLISECONDS.sleep(
100 + rand.nextInt(500));
// Make toast
Toast t = new Toast(count++);
System.out.println(t);
// Insert into queue
toastQueue.put(t);
}
} catch(InterruptedException e) {
System.out.println("Toaster interrupted");
}
System.out.println("Toaster off");
}
}
// Apply butter to toast:
class Butterer implements Runnable {
private ToastQueue dryQueue, butteredQueue;
public Butterer(ToastQueue dry, ToastQueue buttered) {
dryQueue = dry;
butteredQueue = buttered;
}
public void run() {
try {
while(!Thread.interrupted()) {
// Blocks until next piece of toast is available:
Toast t = dryQueue.take();
t.butter();
System.out.println(t);
butteredQueue.put(t);
}
} catch(InterruptedException e) {
System.out.println("Butterer interrupted");
}
System.out.println("Butterer off");
}
}
// Apply jam to buttered toast:
class Jammer implements Runnable {
private ToastQueue butteredQueue, finishedQueue;
public Jammer(ToastQueue buttered, ToastQueue finished) {
butteredQueue = buttered;
finishedQueue = finished;
}
public void run() {
try {
while(!Thread.interrupted()) {
// Blocks until next piece of toast is available:
Toast t = butteredQueue.take();
t.jam();
System.out.println(t);
finishedQueue.put(t);
}
} catch(InterruptedException e) {
System.out.println("Jammer interrupted");
}
System.out.println("Jammer off");
}
}
// Consume the toast:
class Eater implements Runnable {
private ToastQueue finishedQueue;
private int counter = 0;
public Eater(ToastQueue finished) {
finishedQueue = finished;
}
public void run() {
try {
while(!Thread.interrupted()) {
// Blocks until next piece of toast is available:
Toast t = finishedQueue.take();
// Verify that the toast is coming in order,
// and that all pieces are getting jammed:
if(t.getId() != counter++ ||
t.getStatus() != Toast.Status.JAMMED) {
System.out.println(">>>> Error: " + t);
System.exit(1);
} else
System.out.println("Chomp! " + t);
}
} catch(InterruptedException e) {
System.out.println("Eater interrupted");
}
System.out.println("Eater off");
}
}
public class ToastOMatic {
public static void main(String[] args) throws Exception {
ToastQueue dryQueue = new ToastQueue(),
butteredQueue = new ToastQueue(),
finishedQueue = new ToastQueue();
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new Toaster(dryQueue));
exec.execute(new Butterer(dryQueue, butteredQueue));
exec.execute(new Jammer(butteredQueue, finishedQueue));
exec.execute(new Eater(finishedQueue));
TimeUnit.SECONDS.sleep(1);
exec.shutdownNow();
}
}
/* Output
Toast 0: DRY
Toast 0: BUTTERED
Toast 0: JAMMED
Chomp! Toast 0: JAMMED
Toast 1: DRY
Toast 1: BUTTERED
Toast 1: JAMMED
Chomp! Toast 1: JAMMED
Toast 2: DRY
Toast 2: BUTTERED
Toast 2: JAMMED
Chomp! Toast 2: JAMMED
Eater interrupted
Jammer interrupted
Toaster interrupted
Eater off
Toaster off
Jammer off
Butterer interrupted
Butterer off
*/
四个任务分别负责烤面包、涂抹黄油、涂抹果酱和吃面包。各任务都没有使用同步,同步、任务的挂起和唤醒都由BlockingQueue实现。
5. Using pipes for I/O between tasks
管道(Pipe)提供了任务间I/O的通信。任务可以通过PipedWriter和PipedReader对管道进行写入和读取。管道类似于前面的阻塞队列。
import java.util.concurrent.*;
import java.io.*;
import java.util.*;
class Sender implements Runnable {
private Random rand = new Random(47);
private PipedWriter out = new PipedWriter();
public PipedWriter getPipedWriter() { return out; }
public void run() {
try {
while(true)
for(char c = 'A'; c <= 'z'; c++) {
out.write(c);
TimeUnit.MILLISECONDS.sleep(rand.nextInt(500));
}
} catch(IOException e) {
System.out.println(e + " Sender write exception");
} catch(InterruptedException e) {
System.out.println(e + " Sender sleep interrupted");
}
}
}
class Receiver implements Runnable {
private PipedReader in;
public Receiver(Sender sender) throws IOException {
in = new PipedReader(sender.getPipedWriter());
}
public void run() {
try {
while(true) {
// Blocks until characters are there:
System.out.print("Read: " + (char)in.read() + ", ");
}
} catch(IOException e) {
System.out.println(e + " Receiver read exception");
}
}
}
public class PipedIO {
public static void main(String[] args) throws Exception {
Sender sender = new Sender();
Receiver receiver = new Receiver(sender);
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(sender);
exec.execute(receiver);
TimeUnit.SECONDS.sleep(4);
exec.shutdownNow();
}
/* Output
Read: A, Read: B, Read: C, Read: D, Read: E, Read: F, Read: G, Read: H, Read: I, Read: J, Read: K,
java.lang.InterruptedException: sleep interrupted Sender sleep interrupted
java.io.InterruptedIOException Receiver read exception
*/
Sender通过out = new PipedWriter()创建了PipedWriter。Receiver通过in = new PipedReader(sender.getPipedWriter()),使用Sender的out创建PipedReader。in.read()会在IO没有数据时自动进行阻塞,不需要手动使用sleep()或wait()。从打印还可以看到,PipedReader可以被exec.shutdownNow()打断。