Concurrency: Sharing Resources (1)
Contents
1. Improperly accessing resources
考虑这样一个例子:一个任务产生偶数序列,另外一个任务检查第一个任务产生的数字是否为偶数。
定义抽象类IntGenerator
作为所有偶数序列生成器的基类:
public abstract class IntGenerator { private volatile boolean canceled = false; public abstract int next(); // Allow this to be canceled: public void cancel() { canceled = true; } public boolean isCanceled() { return canceled; } }
canceled
作为boolean
型的变量,是原子性(Atomic)的,这意味着对它的简单操作,如赋值、返回值等操作,是不会被打断中断的——即便被多个线程访问,canceled
也不会处于某种不确定的中间状态。这里还用了volatile
来保证canceled
的可见性。
EvenChecker
用于检查任意IntGenerator
的具体子类,解耦了偶数序列的生成和检查:
import java.util.concurrent.*; public class EvenChecker implements Runnable { private IntGenerator generator; private final int id; public EvenChecker(IntGenerator g, int ident) { generator = g; id = ident; } public void run() { while(!generator.isCanceled()) { int val = generator.next(); if(val % 2 != 0) { System.out.println(val + " not even!"); generator.cancel(); // Cancels all EvenCheckers } } } // Test any type of IntGenerator: public static void test(IntGenerator gp, int count) { System.out.println("Press Control-C to exit"); ExecutorService exec = Executors.newCachedThreadPool(); for(int i = 0; i < count; i++) exec.execute(new EvenChecker(gp, i)); exec.shutdown(); } // Default value for count: public static void test(IntGenerator gp) { test(gp, 10); } }
test()
会创建多个EvenChecker
,在多个线程上并行地对同一个IntGenerator
进行检查,判断其输出的数字是否为偶数;如果不是偶数,EvenChecker
则调用IntGenerator
的cancel()
关闭IntGenerator
;所有的EvenChecker
检查到IntGenerator
的isCanceled()
为true
,也终止自己的循环,结束任务。
下面的实现中,EvenGenerator
通过两次自加(初始值为0)来产生偶数:
public class EvenGenerator extends IntGenerator { private int currentEvenValue = 0; public int next() { ++currentEvenValue; // Danger point here! ++currentEvenValue; return currentEvenValue; } public static void main(String[] args) { EvenChecker.test(new EvenGenerator()); } } /* Output Press Control-C to exit 1027 not even! 1031 not even! 1025 not even! 1029 not even! */
由打印可见,出现了非偶数的状况。这里可能发生的情况是,一个EvenChecker
调用了EvenGenerator
的next()
,执行到第一个++currentEvenValue
时,另一个线程上的EvenChecker
也调用了EvenChecker
的next()
,使得currentEvenValue
进入了错误的状态(变成奇数)。
EvenGenerator
的问题并不会立即暴露出来,而是要运行一段时间才会出现,这也是多线程程序的一个特点,就算程序本身有问题,也不一定会立即暴露。如果在两个++currentEvenValue
之间加上yield()
,问题就会暴露的更快。
public class EvenGenerator extends IntGenerator { private int currentEvenValue = 0; public int next() { ++currentEvenValue; // Danger point here! Thread.yield(); ++currentEvenValue; return currentEvenValue; } public static void main(String[] args) { EvenChecker.test(new EvenGenerator()); } } /* Output Press Control-C to exit 3 not even! */
需要注意的是,加法操作不是原子操作(Atomic Operation),需要多个步骤才能完成。任务可能在加法操作过程中就被线程调度机制打断。
上面的问题是使用线程时的一个基本问题:无法知道某个线程会在什么时候运行。多个线程同时修改同一个资源,发生冲突,使资源进入错误的状态。这类问题的一个简单的解决方法是为资源加锁,访问公共资源的任务必须为资源上锁,阻止其他任务同时访问该资源;任务使用完资源后再为其解锁,允许后续任务访问资源并上锁。这种“加锁”形式一般是通过序列化对共享资源的访问实现的,把一次只允许一个线程执行的代码段写在特殊的子句里,这种特殊的子句提供互斥(Mutual Exclusion)的能力,简称为Mutex。
Java提供synchronized
关键字,当一个任务希望执行由synchronized
修饰的代码段时,它会先去检查当前是否能够获取对应的锁定,如果可以,就获取锁定,执行代码段,再解锁。
共享资源只是一块特定的内存,可以是一个对象、文件、I/O端口甚至是打印机。为了控制对共享资源的访问,需要先把共享资源封装到对象里,然后任何使用该共享资源的方法都可以用synchronized
修饰。如果一个任务调用了对象的某个synchronized
方法,所有其他任务都会被阻止进入该对象的任何synchronized
方法,直到第一个任务的调用返回。
所有对象都自带一个锁(称为monitor
),当调用对象的任一个synchronized
方法时,该对象就会被锁定,在该调用返回并解锁前,该对象的其他synchronized
方法都无法被调用。
需要特别注意的是,对相关的成员变量要使用private
。synchronized
无法阻止其他任务的直接访问,从而导致冲突。
一个任务可以多次获取同一个对象的锁,如一个方法调用了同一个对象的其他方法。JVM会对对象加锁的次数进行计数:
- 未加锁的对象计数为0;
- 当一个任务获取的对象的锁后,计数变为1;
- 每当同一个任务重复获取了同一个对象的锁,该对象的锁定计数都会增加1;
- 每当任务离开一个
synchronized
方法,对应对象的锁定计数会减1; - 对象的锁定计数变为0时,对象被解锁。
每一个类也有一个锁,用于synchronized static
方法访问static
数据。
使用synchronized
的时机,使用Thinking in Java原书对Brian’s Rule of Synchronization的引用:
If you are writing a variable that might next be read by another thread, or reading a variable that might have last been written by another thread, you must use synchronization, and further, both the reader and the writer must synchronize using the same monitor lock.
如果正在写入的变量之后可能会被其他线程访问,或者正在读取刚被其他线程写过的变量,就必须使用同步机制。读方和写方必须同步于同一个锁。
每一个访问关键共享资源的方法必须同步。如果在一个类中有多个任务与关键数据有交互,必须把这些任务都进行同步。如果只是同步了其中某个方法,其他方法是不受锁定影响的,就会在同步之外对数据进行修改的情况。
2.1. Synchronizing the EvenGenerator
在EvenGenerator.java
中使用synchronized
可以避免前面遇到的问题:
public class SynchronizedEvenGenerator extends IntGenerator { private int currentEvenValue = 0; public synchronized int next() { ++currentEvenValue; Thread.yield(); // Cause failure faster ++currentEvenValue; return currentEvenValue; } public static void main(String[] args) { EvenChecker.test(new SynchronizedEvenGenerator()); } }
SynchronizedEvenGenerator
使用synchronized
确保同一时间只能有一个任务执行next()
。这里还在两次自加操作中插入Thread.yield()
来尝试“提高”出错的概率,实际运行可以发现,EvenChecker
不会检出奇数的情况,需要手动中断程序。
2.2. Using Explicit Lock Objects
Java SE 5在java.util.concurrent.locks
提供了显式互斥锁的机制。可以显式地创建Lock
对象、加锁和解锁。虽然形式上不如synchronized
优雅,但使用起来更加灵活。
import java.util.concurrent.locks.*; public class MutexEvenGenerator extends IntGenerator { private int currentEvenValue = 0; private Lock lock = new ReentrantLock(); public int next() { lock.lock(); try { ++currentEvenValue; Thread.yield(); // Cause failure faster ++currentEvenValue; return currentEvenValue; } finally { lock.unlock(); } } public static void main(String[] args) { EvenChecker.test(new MutexEvenGenerator()); } }
MutexEvenGenerator
创建了lock
,并手动进行加锁lock()
和解锁unlock()
。注意在lock.lock()
后面紧跟着try-finally
,unlock()
放在finally
中,确保一定能够解锁。return
放在try
里面,确保不会过早解锁而把数据暴露给下个任务。
使用Lock
显式地进行加锁和解锁,可以通过finally
来确保程序处于正常的状态。而在使用synchronized
时如果发生错误,抛出异常,就没有机会进行收尾工作。一般来说,使用synchronized
可以减少代码量,并减少编码上的错误。但使用synchronized
无法尝试加锁,或者为等待解锁设置超时:为了满足这些特殊需求,就需要使用java.util.concurrent.locks
。
import java.util.concurrent.*; import java.util.concurrent.locks.*; public class AttemptLocking { private ReentrantLock lock = new ReentrantLock(); public void untimed() { boolean captured = lock.tryLock(); try { System.out.println("tryLock(): " + captured); } finally { if(captured) lock.unlock(); } } public void timed() { boolean captured = false; try { captured = lock.tryLock(2, TimeUnit.SECONDS); } catch(InterruptedException e) { throw new RuntimeException(e); } try { System.out.println("tryLock(2, TimeUnit.SECONDS): " + captured); } finally { if(captured) lock.unlock(); } } public static void main(String[] args) throws Exception { final AttemptLocking al = new AttemptLocking(); al.untimed(); // True -- lock is available al.timed(); // True -- lock is available // Now create a separate task to grab the lock: new Thread() { { setDaemon(true); } public void run() { al.lock.lock(); System.out.println("acquired"); } }.start(); // Thread.yield(); // Give the 2nd task a chance // Thread.yield() doesn't work on my PC to demonstrate this case, use sleep() instead TimeUnit.MILLISECONDS.sleep(100); al.untimed(); // False -- lock grabbed by task al.timed(); // False -- lock grabbed by task } } /* Output tryLock(): true tryLock(2, TimeUnit.SECONDS): true acquired tryLock(): false tryLock(2, TimeUnit.SECONDS): false */
ReentrantLock
可以尝试或者失败获取锁定,如果发现其他任务已经获取了锁定,就可以去做别的事,而不是单纯地等待解锁。untimed()
尝试进行加锁,不会阻塞。timed()
设置了2秒的超时,超时后程序会继续往下运行。注意在main()
中,Thinking in Java原书在占用资源的守护线程开始后,使用yield()
来让其锁定资源,但在我的电脑(Windows 10 x64 + JDK 8)上,程序最后的al.untimed()
和al.timed()
会先于守护线程开始,并成功获得锁定。所以这里我用sleep()
替代了yield()
,确保守护线程能够先开始并获得锁定。这也体现了线程管理策略在不同操作系统、不同JVM上的不一致性。
显式的Lock
对象提供了对锁定和解锁的细化控制,适用于实现专用的同步结构,如Hand-Overhand Locking(Lock Coupling),用于遍历链表,这需要先获取下一个节点的锁定,然后再解锁当前节点。