Concurrency: Cooperation Between Tasks (2)
Contents
4. Producer-Consumers and Queues
java.util.concurrent.BlockingQueue
接口定义了一种同步队列,同一时间只允许一个任务对队列进行插入和删除。BlockingQueue
有多种标准实现,LinkedBlockingQueue
提供不限长度的队列,ArrayBlockingQueue
具有固定长度。如果队列为空,试图从该队列中获取元素的任务会被挂起,直到有新元素添加进队列。
class LiftOff implements Runnable { protected int countDown = 10; // Default private static int taskCount = 0; private final int id = taskCount++; public LiftOff() {} public LiftOff(int countDown) { this.countDown = countDown; } public String status() { return "#" + id + "(" + (countDown > 0 ? countDown : "Liftoff!") + "), "; } public void run() { while(countDown-- > 0) { System.out.print(status()); Thread.yield(); } } } class LiftOffRunner implements Runnable { private BlockingQueue<LiftOff> rockets; public LiftOffRunner(BlockingQueue<LiftOff> queue) { rockets = queue; } public void add(LiftOff lo) { try { rockets.put(lo); } catch(InterruptedException e) { System.out.println("Interrupted during put()"); } } public void run() { try { while(!Thread.interrupted()) { LiftOff rocket = rockets.take(); rocket.run(); // Use this thread } } catch(InterruptedException e) { System.out.println("Waking from take()"); } System.out.println("Exiting LiftOffRunner"); } } public class TestBlockingQueues { static void getkey() { try { // Compensate for Windows/Linux difference in the // length of the result produced by the Enter key: new BufferedReader(new InputStreamReader(System.in)).readLine(); } catch(java.io.IOException e) { throw new RuntimeException(e); } } static void getkey(String message) { System.out.println(message); getkey(); } static void test(String msg, BlockingQueue<LiftOff> queue) { System.out.println(msg); LiftOffRunner runner = new LiftOffRunner(queue); Thread t = new Thread(runner); t.start(); for(int i = 0; i < 5; i++) runner.add(new LiftOff(5)); getkey("Press ‘Enter’ (" + msg + ")"); t.interrupt(); System.out.println("Finished " + msg + " test"); } public static void main(String[] args) { test("LinkedBlockingQueue", // Unlimited size new LinkedBlockingQueue<LiftOff>()); test("ArrayBlockingQueue", // Fixed size new ArrayBlockingQueue<LiftOff>(3)); test("SynchronousQueue", // Size of 1 new SynchronousQueue<LiftOff>()); } } /* LinkedBlockingQueue Press ‘Enter’ (LinkedBlockingQueue) #0(4), #0(3), #0(2), #0(1), #0(Liftoff!), #1(4), #1(3), #1(2), #1(1), #1(Liftoff!), #2(4), #2(3), #2(2), #2(1), #2(Liftoff!), #3(4), #3(3), #3(2), #3(1), #3(Liftoff!), #4(4), #4(3), #4(2), #4(1), #4(Liftoff!), Finished LinkedBlockingQueue test Waking from take() Exiting LiftOffRunner ArrayBlockingQueue #5(4), #5(3), #5(2), #5(1), #5(Liftoff!), Press ‘Enter’ (ArrayBlockingQueue) #6(4), #6(3), #6(2), #6(1), #6(Liftoff!), #7(4), #7(3), #7(2), #7(1), #7(Liftoff!), #8(4), #8(3), #8(2), #8(1), #8(Liftoff!), #9(4), #9(3), #9(2), #9(1), #9(Liftoff!), Finished ArrayBlockingQueue test SynchronousQueue Waking from take() Exiting LiftOffRunner #10(4), #10(3), #10(2), #10(1), #10(Liftoff!), #11(4), #11(3), #11(2), #11(1), #11(Liftoff!), #12(4), #12(3), #12(2), #12(1), #12(Liftoff!), #13(4), #13(3), #13(2), #13(1), #13(Liftoff!), Press ‘Enter’ (SynchronousQueue) #14(4), #14(3), #14(2), #14(1), #14(Liftoff!), Finished SynchronousQueue test Waking from take() Exiting LiftOffRunner */
上面的例子在main()
中分别向三种BlockingQueue
中依次加入5个LiftOff
,LiftOff
被LiftOffRunner
消耗。同步由BlockingQueue
处理,LiftOffRunner
不需要考虑同步问题,
4.1. BlockingQueues of Toast
下面是使用BlockingQueue
的另一个例子:
package com.company; import java.util.concurrent.*; import java.util.*; class Toast { public enum Status { DRY, BUTTERED, JAMMED } private Status status = Status.DRY; private final int id; public Toast(int idn) { id = idn; } public void butter() { status = Status.BUTTERED; } public void jam() { status = Status.JAMMED; } public Status getStatus() { return status; } public int getId() { return id; } public String toString() { return "Toast " + id + ": " + status; } } class ToastQueue extends LinkedBlockingQueue<Toast> {} class Toaster implements Runnable { private ToastQueue toastQueue; private int count = 0; private Random rand = new Random(47); public Toaster(ToastQueue tq) { toastQueue = tq; } public void run() { try { while(!Thread.interrupted()) { TimeUnit.MILLISECONDS.sleep( 100 + rand.nextInt(500)); // Make toast Toast t = new Toast(count++); System.out.println(t); // Insert into queue toastQueue.put(t); } } catch(InterruptedException e) { System.out.println("Toaster interrupted"); } System.out.println("Toaster off"); } } // Apply butter to toast: class Butterer implements Runnable { private ToastQueue dryQueue, butteredQueue; public Butterer(ToastQueue dry, ToastQueue buttered) { dryQueue = dry; butteredQueue = buttered; } public void run() { try { while(!Thread.interrupted()) { // Blocks until next piece of toast is available: Toast t = dryQueue.take(); t.butter(); System.out.println(t); butteredQueue.put(t); } } catch(InterruptedException e) { System.out.println("Butterer interrupted"); } System.out.println("Butterer off"); } } // Apply jam to buttered toast: class Jammer implements Runnable { private ToastQueue butteredQueue, finishedQueue; public Jammer(ToastQueue buttered, ToastQueue finished) { butteredQueue = buttered; finishedQueue = finished; } public void run() { try { while(!Thread.interrupted()) { // Blocks until next piece of toast is available: Toast t = butteredQueue.take(); t.jam(); System.out.println(t); finishedQueue.put(t); } } catch(InterruptedException e) { System.out.println("Jammer interrupted"); } System.out.println("Jammer off"); } } // Consume the toast: class Eater implements Runnable { private ToastQueue finishedQueue; private int counter = 0; public Eater(ToastQueue finished) { finishedQueue = finished; } public void run() { try { while(!Thread.interrupted()) { // Blocks until next piece of toast is available: Toast t = finishedQueue.take(); // Verify that the toast is coming in order, // and that all pieces are getting jammed: if(t.getId() != counter++ || t.getStatus() != Toast.Status.JAMMED) { System.out.println(">>>> Error: " + t); System.exit(1); } else System.out.println("Chomp! " + t); } } catch(InterruptedException e) { System.out.println("Eater interrupted"); } System.out.println("Eater off"); } } public class ToastOMatic { public static void main(String[] args) throws Exception { ToastQueue dryQueue = new ToastQueue(), butteredQueue = new ToastQueue(), finishedQueue = new ToastQueue(); ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new Toaster(dryQueue)); exec.execute(new Butterer(dryQueue, butteredQueue)); exec.execute(new Jammer(butteredQueue, finishedQueue)); exec.execute(new Eater(finishedQueue)); TimeUnit.SECONDS.sleep(1); exec.shutdownNow(); } } /* Output Toast 0: DRY Toast 0: BUTTERED Toast 0: JAMMED Chomp! Toast 0: JAMMED Toast 1: DRY Toast 1: BUTTERED Toast 1: JAMMED Chomp! Toast 1: JAMMED Toast 2: DRY Toast 2: BUTTERED Toast 2: JAMMED Chomp! Toast 2: JAMMED Eater interrupted Jammer interrupted Toaster interrupted Eater off Toaster off Jammer off Butterer interrupted Butterer off */
四个任务分别负责烤面包、涂抹黄油、涂抹果酱和吃面包。各任务都没有使用同步,同步、任务的挂起和唤醒都由BlockingQueue
实现。
5. Using pipes for I/O between tasks
管道(Pipe)提供了任务间I/O的通信。任务可以通过PipedWriter
和PipedReader
对管道进行写入和读取。管道类似于前面的阻塞队列。
import java.util.concurrent.*; import java.io.*; import java.util.*; class Sender implements Runnable { private Random rand = new Random(47); private PipedWriter out = new PipedWriter(); public PipedWriter getPipedWriter() { return out; } public void run() { try { while(true) for(char c = 'A'; c <= 'z'; c++) { out.write(c); TimeUnit.MILLISECONDS.sleep(rand.nextInt(500)); } } catch(IOException e) { System.out.println(e + " Sender write exception"); } catch(InterruptedException e) { System.out.println(e + " Sender sleep interrupted"); } } } class Receiver implements Runnable { private PipedReader in; public Receiver(Sender sender) throws IOException { in = new PipedReader(sender.getPipedWriter()); } public void run() { try { while(true) { // Blocks until characters are there: System.out.print("Read: " + (char)in.read() + ", "); } } catch(IOException e) { System.out.println(e + " Receiver read exception"); } } } public class PipedIO { public static void main(String[] args) throws Exception { Sender sender = new Sender(); Receiver receiver = new Receiver(sender); ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(sender); exec.execute(receiver); TimeUnit.SECONDS.sleep(4); exec.shutdownNow(); } /* Output Read: A, Read: B, Read: C, Read: D, Read: E, Read: F, Read: G, Read: H, Read: I, Read: J, Read: K, java.lang.InterruptedException: sleep interrupted Sender sleep interrupted java.io.InterruptedIOException Receiver read exception */
Sender
通过out = new PipedWriter()
创建了PipedWriter
。Receiver
通过in = new PipedReader(sender.getPipedWriter())
,使用Sender
的out
创建PipedReader
。in.read()
会在IO没有数据时自动进行阻塞,不需要手动使用sleep()
或wait()
。从打印还可以看到,PipedReader
可以被exec.shutdownNow()
打断。