Concurrency: Cooperation Between Tasks (1)

  使用wait()notifyAll(),以及Java SE 5中引入的await()signal(),可以控制任务的运行,实现任务间的协作。

1. wait() and notifyAll()

  wait()会让任务挂起,与sleep()不同的是,wait()会解除当前所占用的对象的锁定,并会被notify()notifyAll()唤醒,wait()结束前会重新获得进入wait()时所释放的对象的锁定(如果对象被其他任务占用,则wait()会继续等待)。可以通过wait(pause)的方式等待特定时间。

  值得注意的是,wait()notify()notifyAll()都是Object的方法,而不是Thread的,因为这些方法会操作对象的锁,如wait()就会解除锁定。但这并不意味着可以在任何地方调用这些方法。

  只能在synchronized方法和synchronized块中使用wait()notify()notifyAll(),也就是说调用这些方法必须首先获得对应对象的锁,如:

synchronized(x) { 
    x.notifyAll(); 
}

在其他地方使用wait()notify()notifyAll(),可以正常编译,但运行时会抛出IllegalMonitorStateExceptionsleep()不操作对象的锁,可以在非同步的方法中使用。

import java.util.concurrent.*;

class Car {
    private boolean waxOn = false;
    public synchronized void waxed() {
        waxOn = true; // Ready to buff
        notifyAll();
    }
    public synchronized void buffed() {
        waxOn = false; // Ready for another coat of wax
        notifyAll();
    }
    public synchronized void waitForWaxing()
            throws InterruptedException {
        while(waxOn == false)
            wait();
    }
    public synchronized void waitForBuffing()
            throws InterruptedException {
        while(waxOn == true)
            wait();
    }
}

class WaxOn implements Runnable {
    private Car car;
    public WaxOn(Car c) { car = c; }
    public void run() {
        try {
            while(!Thread.interrupted()) {
                System.out.print("Wax On! ");
                TimeUnit.MILLISECONDS.sleep(200);
                car.waxed();
                car.waitForBuffing();
            }
        } catch(InterruptedException e) {
            System.out.println("Exiting via interrupt");
        }
        System.out.println("Ending Wax On task");
    }
}

class WaxOff implements Runnable {
    private Car car;
    public WaxOff(Car c) { car = c; }
    public void run() {
        try {
            while(!Thread.interrupted()) {
                car.waitForWaxing();
                System.out.print("Wax Off! ");
                TimeUnit.MILLISECONDS.sleep(200);
                car.buffed();
            }
        } catch(InterruptedException e) {
            System.out.println("Exiting via interrupt");
        }
        System.out.println("Ending Wax Off task");
    }
}

public class WaxOMatic {
    public static void main(String[] args) throws Exception {
        Car car = new Car();
        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(new WaxOff(car));
        exec.execute(new WaxOn(car));
        TimeUnit.SECONDS.sleep(5); // Run for a while...
        exec.shutdownNow(); // Interrupt all tasks
    }
}
/* Output
Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Exiting via interrupt
Ending Wax On task
Exiting via interrupt
Ending Wax Off task
*/

这里同时执行了WaxOffWaxOn两个任务。WaxOff首先waitForWaxing(),如果waxOnfalse,则调用wait()把自己挂起,同时释放自身(Car)的锁定。等到WaxOn调用waxed(),设置waxOntrue,并notifyAll()。此时waitForWaxing()的等待结束,WaxOff继续执行,在通过buffed()设置waxOntrue······由打印可见,WaxOnWaxOff交替执行,不会发生死锁。

  上面的例子把wait()放在了while循环里,每次唤醒后都会通过循环条件检查waxOn,如果waxOn不满足条件,则继续wait()。把wait()放在循环里,检查唤醒的条件和原因十分重要,因为:

  • 可能同时有多个任务在wait()同一个事件,如果先被唤醒的任务处理了这个事件,其他任务通过判断相关状态,获知事件已经被处理,可以选择继续wait()
  • 当任务从wait()中被唤醒,可能同时有其他任务又改变了当前的状态,使得被唤醒的任务无法再执行,或者不需要再执行,此时被唤醒的任务通过判断相关状态,可以选择继续wait()
  • 唤醒任务的事件可能并不是该任务所感兴趣的事件,被唤醒的任务通过检查唤醒的原因,可以选择继续wait()

1.1. Missed Signals

  当两个线程通过notify()/wait()或者notifyAll()/wait()进行协作时,可能会出现丢失唤醒信号的情况。如下面的两个任务:

T1: 
synchronized(sharedMonitor) { 
    <setup condition for T2> 
    sharedMonitor.notify(); 
}
 
T2: 
while(someCondition) { 
    // Point 1 
    synchronized(sharedMonitor) { 
        sharedMonitor.wait(); 
    } 
}

可能存在这样的情况:T2运行时满足someCondition,进入while循环,执行到Point 1时,发生线程调度切换到T1;T1用于结束T2的等待,<setup condition for T2>中修改某些状态使得someCondition不再满足,并通过notify()通知T2结束等待;然后切换到T2从Point 1后继续执行,T2进入wait()。T1的notify()在T2的wait()前发生,T2的wait()无法被唤醒,进入死锁。

  为了消除someCondition的竞争,T2应使用如下的形式:

T2: 
synchronized(sharedMonitor) { 
  while(someCondition) 
    sharedMonitor.wait(); 
}

如果T1先完成,T2的someCondition不满足,不会进入循环;如果T2先运行进入了循环,T1也可以正常唤醒T2。

2. notify() vs. notifyAll()

  notify()只能唤醒一个任务。notifyAll()可以唤醒wait()在指定的锁上的所有任务,而不是所有处于wait()的任务。

import java.util.concurrent.*;
import java.util.*;

class Blocker {
    synchronized void waitingCall() {
        try {
            while(!Thread.interrupted()) {
                wait();
                System.out.print(Thread.currentThread() + " ");
            }
        } catch(InterruptedException e) {
            // OK to exit this way
        }
    }
    synchronized void prod() { notify(); }
    synchronized void prodAll() { notifyAll(); }
}

class Task implements Runnable {
    static Blocker blocker = new Blocker();
    public void run() { blocker.waitingCall(); }
}

class Task2 implements Runnable {
    // A separate Blocker object:
    static Blocker blocker = new Blocker();
    public void run() { blocker.waitingCall(); }
}

public class NotifyVsNotifyAll {
    public static void main(String[] args) throws Exception {
        ExecutorService exec = Executors.newCachedThreadPool();
        for(int i = 0; i < 5; i++)
            exec.execute(new Task());
        exec.execute(new Task2());
        Timer timer = new Timer();
        timer.scheduleAtFixedRate(new TimerTask() {
            boolean prod = true;
            public void run() {
                if(prod) {
                    System.out.print("\nnotify() ");
                    Task.blocker.prod();
                    prod = false;
                } else {
                    System.out.print("\nnotifyAll() ");
                    Task.blocker.prodAll();
                    prod = true;
                }
            }
        }, 400, 400); // Run every .4 second
        TimeUnit.SECONDS.sleep(5); // Run for a while...
        timer.cancel();
        System.out.println("\nTimer canceled");
        TimeUnit.MILLISECONDS.sleep(500);
        System.out.print("Task2.blocker.prodAll() ");
        Task2.blocker.prodAll();
        TimeUnit.MILLISECONDS.sleep(500);
        System.out.println("\nShutting down");
        exec.shutdownNow(); // Interrupt all tasks
    }
}
/* Output:
notify() Thread[pool-1-thread-3,5,main] 
notifyAll() Thread[pool-1-thread-3,5,main] Thread[pool-1-thread-4,5,main] Thread[pool-1-thread-2,5,main] Thread[pool-1-thread-5,5,main] Thread[pool-1-thread-1,5,main] 
notify() Thread[pool-1-thread-3,5,main] 
notifyAll() Thread[pool-1-thread-3,5,main] Thread[pool-1-thread-1,5,main] Thread[pool-1-thread-5,5,main] Thread[pool-1-thread-2,5,main] Thread[pool-1-thread-4,5,main] 
notify() Thread[pool-1-thread-3,5,main] 
notifyAll() Thread[pool-1-thread-3,5,main] Thread[pool-1-thread-4,5,main] Thread[pool-1-thread-2,5,main] Thread[pool-1-thread-5,5,main] Thread[pool-1-thread-1,5,main] 
notify() Thread[pool-1-thread-3,5,main] 
notifyAll() Thread[pool-1-thread-3,5,main] Thread[pool-1-thread-1,5,main] Thread[pool-1-thread-5,5,main] Thread[pool-1-thread-2,5,main] Thread[pool-1-thread-4,5,main] 
notify() Thread[pool-1-thread-3,5,main] 
notifyAll() Thread[pool-1-thread-3,5,main] Thread[pool-1-thread-4,5,main] Thread[pool-1-thread-2,5,main] Thread[pool-1-thread-5,5,main] Thread[pool-1-thread-1,5,main] 
notify() Thread[pool-1-thread-3,5,main] 
notifyAll() Thread[pool-1-thread-3,5,main] Thread[pool-1-thread-1,5,main] Thread[pool-1-thread-5,5,main] Thread[pool-1-thread-2,5,main] Thread[pool-1-thread-4,5,main] 
Timer canceled
Task2.blocker.prodAll() Thread[pool-1-thread-6,5,main] 
Shutting down
*/

由打印可见,notify()会唤醒一个任务,notifyAll()会唤醒所有任务。TaskTask2具有不同的Blocker对象,Tasknotify()notifyAll()不会唤醒Task2,反之亦然。Blockerprod()prod()都是synchronized的,notify()notifyAll()必须要在synchronized方法或者synchronized块中才能执行,只会唤醒同步于对应Blockerwait()

3. Producers and Consumers

3.1. An Example for Producers and Consumers

  下面的例子中,一家餐馆有一个厨师和一个服务员,服务员等待厨师做菜,厨师做好菜后,通知服务员上菜,服务员上完菜继续回到等待厨师做菜的状态。这里厨师是Producer,生产资源;服务员是Consumer,消耗资源。二者形成一个简单的合作关系。

import java.util.concurrent.*;

class Meal {
    private final int orderNum;
    public Meal(int orderNum) { this.orderNum = orderNum; }
    public String toString() { return "Meal " + orderNum; }
}

class WaitPerson implements Runnable {
    private Restaurant restaurant;
    public WaitPerson(Restaurant r) { restaurant = r; }
    public void run() {
        try {
            while(!Thread.interrupted()) {
                synchronized(this) {
                    while(restaurant.meal == null)
                        wait(); // ... for the chef to produce a meal
                }
                System.out.println("Waitperson got " + restaurant.meal);
                synchronized(restaurant.chef) {
                    restaurant.meal = null;
                    restaurant.chef.notifyAll(); // Ready for another
                }
            }
        } catch(InterruptedException e) {
            System.out.println("WaitPerson interrupted");
        }
    }
}

class Chef implements Runnable {
    private Restaurant restaurant;
    private int count = 0;
    public Chef(Restaurant r) { restaurant = r; }
    public void run() {
        try {
            while(!Thread.interrupted()) {
                synchronized(this) {
                    while(restaurant.meal != null)
                        wait(); // ... for the meal to be taken
                }
                if(++count == 10) {
                    System.out.println("Out of food, closing");
                    restaurant.exec.shutdownNow();
                }
                System.out.print("Order up! ");
                synchronized(restaurant.waitPerson) {
                    restaurant.meal = new Meal(count);
                    restaurant.waitPerson.notifyAll();
                }
                TimeUnit.MILLISECONDS.sleep(100);
            }
        } catch(InterruptedException e) {
            System.out.println("Chef interrupted");
        }
    }
}

public class Restaurant {
    Meal meal;
    ExecutorService exec = Executors.newCachedThreadPool();
    WaitPerson waitPerson = new WaitPerson(this);
    Chef chef = new Chef(this);
    public Restaurant() {
        exec.execute(chef);
        exec.execute(waitPerson);
    }
    public static void main(String[] args) {
        new Restaurant();
    }
}
/* Output
Order up! Waitperson got Meal 1
Order up! Waitperson got Meal 2
Order up! Waitperson got Meal 3
Order up! Waitperson got Meal 4
Order up! Waitperson got Meal 5
Order up! Waitperson got Meal 6
Order up! Waitperson got Meal 7
Order up! Waitperson got Meal 8
Order up! Waitperson got Meal 9
Out of food, closing
Order up! WaitPerson interrupted
Chef interrupted
*/

上面只是一个简单的例子,一个Producer产生一个对象,由一个Consumer消耗。典型的Producer-Consumer实现一般会使用一个先入先出的队列,用于保存和消耗所生产的对象。

3.2. Using Explicit Lock and Condition Objects

  Java SE 5在java.util.concurrent包中提供了Condition,调用Conditionawait()可以挂起任务,signal()signalAll()可以唤醒任务。

  下面使用Condition重写之前的WaxOMatic

import java.util.concurrent.*;
import java.util.concurrent.locks.*;

class Car {
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();
    private boolean waxOn = false;
    public void waxed() {
        lock.lock();
        try {
            waxOn = true; // Ready to buff
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }
    public void buffed() {
        lock.lock();
        try {
            waxOn = false; // Ready for another coat of wax
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }
    public void waitForWaxing() throws InterruptedException {
        lock.lock();
        try {
            while(waxOn == false)
                condition.await();
        } finally {
            lock.unlock();
        }
    }
    public void waitForBuffing() throws InterruptedException{
        lock.lock();
        try {
            while(waxOn == true)
                condition.await();
        } finally {
            lock.unlock();
        }
    }
}

class WaxOn implements Runnable {
    private Car car;
    public WaxOn(Car c) { car = c; }
    public void run() {
        try {
            while(!Thread.interrupted()) {
                System.out.print("Wax On! ");
                TimeUnit.MILLISECONDS.sleep(200);
                car.waxed();
                car.waitForBuffing();
            }
        } catch(InterruptedException e) {
            System.out.println("Exiting via interrupt");
        }
        System.out.println("Ending Wax On task");
    }
}

class WaxOff implements Runnable {
    private Car car;
    public WaxOff(Car c) { car = c; }
    public void run() {
        try {
            while(!Thread.interrupted()) {
                car.waitForWaxing();
                System.out.print("Wax Off! ");
                TimeUnit.MILLISECONDS.sleep(200);
                car.buffed();
            }
        } catch(InterruptedException e) {
            System.out.println("Exiting via interrupt");
        }
        System.out.println("Ending Wax Off task");
    }
}

public class WaxOMatic2 {
    public static void main(String[] args) throws Exception {
        Car car = new Car();
        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(new WaxOff(car));
        exec.execute(new WaxOn(car));
        TimeUnit.SECONDS.sleep(5);
        exec.shutdownNow();
    }
}
/* 
Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Exiting via interrupt
Ending Wax On task
Exiting via interrupt
Ending Wax Off task
*/

Car中通过Condition condition = lock.newCondition(),由一个Lock创建ConditionCondition本身不能携带状态,还是需要waxOn来保存当前状态。这里waxed()buffed()waitForWaxing()waitForBuffing()不再使用synchronized,而是使用lock手动上锁。lock.lock()后紧跟try-finally确保始终能够解锁。在这个例子中使用LockCondition并没有带来什么额外的好处,LockCondition通常会用于更加复杂的情况。