Concurrency: Sharing Resources (1)

1. Improperly accessing resources

  考虑这样一个例子:一个任务产生偶数序列,另外一个任务检查第一个任务产生的数字是否为偶数。

  定义抽象类IntGenerator作为所有偶数序列生成器的基类:

public abstract class IntGenerator {
    private volatile boolean canceled = false;
    public abstract int next();
    // Allow this to be canceled:
    public void cancel() { canceled = true; }
    public boolean isCanceled() { return canceled; }
}

  canceled作为boolean型的变量,是原子性(Atomic)的,这意味着对它的简单操作,如赋值、返回值等操作,是不会被打断中断的——即便被多个线程访问,canceled也不会处于某种不确定的中间状态。这里还用了volatile来保证canceled的可见性。

  EvenChecker用于检查任意IntGenerator的具体子类,解耦了偶数序列的生成和检查:

import java.util.concurrent.*;

public class EvenChecker implements Runnable {
    private IntGenerator generator;
    private final int id;
    public EvenChecker(IntGenerator g, int ident) {
        generator = g;
        id = ident;
    }
    public void run() {
        while(!generator.isCanceled()) {
            int val = generator.next();
            if(val % 2 != 0) {
                System.out.println(val + " not even!");
                generator.cancel(); // Cancels all EvenCheckers
            }
        }
    }
    // Test any type of IntGenerator:
    public static void test(IntGenerator gp, int count) {
        System.out.println("Press Control-C to exit");
        ExecutorService exec = Executors.newCachedThreadPool();
        for(int i = 0; i < count; i++)
            exec.execute(new EvenChecker(gp, i));
        exec.shutdown();
    }
    // Default value for count:
    public static void test(IntGenerator gp) {
        test(gp, 10);
    }
}

test()会创建多个EvenChecker,在多个线程上并行地对同一个IntGenerator进行检查,判断其输出的数字是否为偶数;如果不是偶数,EvenChecker则调用IntGeneratorcancel()关闭IntGenerator;所有的EvenChecker检查到IntGeneratorisCanceled()true,也终止自己的循环,结束任务。

  下面的实现中,EvenGenerator通过两次自加(初始值为0)来产生偶数:

public class EvenGenerator extends IntGenerator {
    private int currentEvenValue = 0;
    public int next() {
        ++currentEvenValue; // Danger point here!
        ++currentEvenValue;
        return currentEvenValue;
    }
    public static void main(String[] args) {
        EvenChecker.test(new EvenGenerator());
    }
}
/* Output
Press Control-C to exit
1027 not even!
1031 not even!
1025 not even!
1029 not even!
*/

由打印可见,出现了非偶数的状况。这里可能发生的情况是,一个EvenChecker调用了EvenGeneratornext(),执行到第一个++currentEvenValue时,另一个线程上的EvenChecker也调用了EvenCheckernext(),使得currentEvenValue进入了错误的状态(变成奇数)。

  EvenGenerator的问题并不会立即暴露出来,而是要运行一段时间才会出现,这也是多线程程序的一个特点,就算程序本身有问题,也不一定会立即暴露。如果在两个++currentEvenValue之间加上yield(),问题就会暴露的更快。

public class EvenGenerator extends IntGenerator {
    private int currentEvenValue = 0;
    public int next() {
        ++currentEvenValue; // Danger point here!
        Thread.yield();
        ++currentEvenValue;
        return currentEvenValue;
    }
    public static void main(String[] args) {
        EvenChecker.test(new EvenGenerator());
    }
}
/* Output
Press Control-C to exit
3 not even!
*/

  需要注意的是,加法操作不是原子操作(Atomic Operation),需要多个步骤才能完成。任务可能在加法操作过程中就被线程调度机制打断。

2. Resolving Shared Resource Contention

  上面的问题是使用线程时的一个基本问题:无法知道某个线程会在什么时候运行。多个线程同时修改同一个资源,发生冲突,使资源进入错误的状态。这类问题的一个简单的解决方法是为资源加锁,访问公共资源的任务必须为资源上锁,阻止其他任务同时访问该资源;任务使用完资源后再为其解锁,允许后续任务访问资源并上锁。这种“加锁”形式一般是通过序列化对共享资源的访问实现的,把一次只允许一个线程执行的代码段写在特殊的子句里,这种特殊的子句提供互斥(Mutual Exclusion)的能力,简称为Mutex。

  Java提供synchronized关键字,当一个任务希望执行由synchronized修饰的代码段时,它会先去检查当前是否能够获取对应的锁定,如果可以,就获取锁定,执行代码段,再解锁。

  共享资源只是一块特定的内存,可以是一个对象、文件、I/O端口甚至是打印机。为了控制对共享资源的访问,需要先把共享资源封装到对象里,然后任何使用该共享资源的方法都可以用synchronized修饰。如果一个任务调用了对象的某个synchronized方法,所有其他任务都会被阻止进入该对象的任何synchronized方法,直到第一个任务的调用返回。

  所有对象都自带一个锁(称为monitor),当调用对象的任一个synchronized方法时,该对象就会被锁定,在该调用返回并解锁前,该对象的其他synchronized方法都无法被调用。

  需要特别注意的是,对相关的成员变量要使用privatesynchronized无法阻止其他任务的直接访问,从而导致冲突。

  一个任务可以多次获取同一个对象的锁,如一个方法调用了同一个对象的其他方法。JVM会对对象加锁的次数进行计数:

  • 未加锁的对象计数为0;
  • 当一个任务获取的对象的锁后,计数变为1;
  • 每当同一个任务重复获取了同一个对象的锁,该对象的锁定计数都会增加1;
  • 每当任务离开一个synchronized方法,对应对象的锁定计数会减1;
  • 对象的锁定计数变为0时,对象被解锁。

  每一个类也有一个锁,用于synchronized static方法访问static数据。

  使用synchronized的时机,使用Thinking in Java原书对Brian’s Rule of Synchronization的引用:
  

If you are writing a variable that might next be read by another thread, or reading a variable that might have last been written by another thread, you must use synchronization, and further, both the reader and the writer must synchronize using the same monitor lock.

如果正在写入的变量之后可能会被其他线程访问,或者正在读取刚被其他线程写过的变量,就必须使用同步机制。读方和写方必须同步于同一个锁。

  每一个访问关键共享资源的方法必须同步。如果在一个类中有多个任务与关键数据有交互,必须把这些任务都进行同步。如果只是同步了其中某个方法,其他方法是不受锁定影响的,就会在同步之外对数据进行修改的情况。

2.1. Synchronizing the EvenGenerator

  在EvenGenerator.java中使用synchronized可以避免前面遇到的问题:

public class SynchronizedEvenGenerator extends IntGenerator {
    private int currentEvenValue = 0;
    public synchronized int next() {
        ++currentEvenValue;
        Thread.yield(); // Cause failure faster 
        ++currentEvenValue;
        return currentEvenValue;
    }
    public static void main(String[] args) {
        EvenChecker.test(new SynchronizedEvenGenerator());
    }
}

SynchronizedEvenGenerator使用synchronized确保同一时间只能有一个任务执行next()。这里还在两次自加操作中插入Thread.yield()来尝试“提高”出错的概率,实际运行可以发现,EvenChecker不会检出奇数的情况,需要手动中断程序。

2.2. Using Explicit Lock Objects

  Java SE 5在java.util.concurrent.locks提供了显式互斥锁的机制。可以显式地创建Lock对象、加锁和解锁。虽然形式上不如synchronized优雅,但使用起来更加灵活。

import java.util.concurrent.locks.*;

public class MutexEvenGenerator extends IntGenerator {
    private int currentEvenValue = 0;
    private Lock lock = new ReentrantLock();
    public int next() {
        lock.lock();
        try {
            ++currentEvenValue;
            Thread.yield(); // Cause failure faster
            ++currentEvenValue;
            return currentEvenValue;
        } finally {
            lock.unlock();
        }
    }
    public static void main(String[] args) {
        EvenChecker.test(new MutexEvenGenerator());
    }
}

MutexEvenGenerator创建了lock,并手动进行加锁lock()和解锁unlock()。注意在lock.lock()后面紧跟着try-finallyunlock()放在finally中,确保一定能够解锁。return放在try里面,确保不会过早解锁而把数据暴露给下个任务。

  使用Lock显式地进行加锁和解锁,可以通过finally来确保程序处于正常的状态。而在使用synchronized时如果发生错误,抛出异常,就没有机会进行收尾工作。一般来说,使用synchronized可以减少代码量,并减少编码上的错误。但使用synchronized无法尝试加锁,或者为等待解锁设置超时:为了满足这些特殊需求,就需要使用java.util.concurrent.locks

import java.util.concurrent.*;
import java.util.concurrent.locks.*;

public class AttemptLocking {
    private ReentrantLock lock = new ReentrantLock();
    public void untimed() {
        boolean captured = lock.tryLock();
        try {
            System.out.println("tryLock(): " + captured);
        } finally {
            if(captured)
                lock.unlock();
        }
    }
    public void timed() {
        boolean captured = false;
        try {
            captured = lock.tryLock(2, TimeUnit.SECONDS);
        } catch(InterruptedException e) {
            throw new RuntimeException(e);
        }
        try {
            System.out.println("tryLock(2, TimeUnit.SECONDS): " +
                    captured);
        } finally {
            if(captured)
                lock.unlock();
        }
    }
    public static void main(String[] args) throws Exception {
        final AttemptLocking al = new AttemptLocking();
        al.untimed(); // True -- lock is available
        al.timed();   // True -- lock is available
        // Now create a separate task to grab the lock:
        new Thread() {
            { setDaemon(true); }
            public void run() {
                al.lock.lock();
                System.out.println("acquired");
            }
        }.start();
        // Thread.yield(); // Give the 2nd task a chance
        // Thread.yield() doesn't work on my PC to demonstrate this case, use sleep() instead
        TimeUnit.MILLISECONDS.sleep(100);
        al.untimed(); // False -- lock grabbed by task
        al.timed();   // False -- lock grabbed by task
    }
}
/* Output
tryLock(): true
tryLock(2, TimeUnit.SECONDS): true
acquired
tryLock(): false
tryLock(2, TimeUnit.SECONDS): false
*/

ReentrantLock可以尝试或者失败获取锁定,如果发现其他任务已经获取了锁定,就可以去做别的事,而不是单纯地等待解锁。untimed()尝试进行加锁,不会阻塞。timed()设置了2秒的超时,超时后程序会继续往下运行。注意在main()中,Thinking in Java原书在占用资源的守护线程开始后,使用yield()来让其锁定资源,但在我的电脑(Windows 10 x64 + JDK 8)上,程序最后的al.untimed()al.timed()会先于守护线程开始,并成功获得锁定。所以这里我用sleep()替代了yield(),确保守护线程能够先开始并获得锁定。这也体现了线程管理策略在不同操作系统、不同JVM上的不一致性。

  显式的Lock对象提供了对锁定和解锁的细化控制,适用于实现专用的同步结构,如Hand-Overhand Locking(Lock Coupling),用于遍历链表,这需要先获取下一个节点的锁定,然后再解锁当前节点。