Concurrency: Sharing Resources (2)
Contents
3 Atomicity and Volatility
原子操作(Atomic Operation)指的是不会被线程调度打断的操作。对除了long
和double
以外基本类型的简单操作(如赋值和返回值)是原子性的。JVM允许以两次32位操作的形式读写64位的值(long
和double
型变量),导致读写期间可能发生上下文切换。为了使long
和double
得到原子性,可以使用volatile
来修饰long
和double
型变量。
凭借原子操作不会被线程调度打断的特性,专家可以写出没有锁的代码,不需要使用同步机制。但不要轻易尝试。移除同步通常意味着不成熟的优化,通常会带来更多问题。
在多处理器系统上,可见性(Visibility)通常比原子性(Atomicity)更容易引发问题。在一个任务中进行的修改,即便使用的是原子操作、不会被打断,这个修改在另一个任务中也可能是不可见的,因为修改可能被暂存在处理器的缓存中,对其他任务不可见。那么问题来了:不同任务对当前应用程序的状态有不同的观测。同步机制会强制一个任务的修改可以在整个应用程序中可见;没有同步机制,一个任务的修改何时能对其他任务可见是不确定的。
volatile
关键字也可以确保可见性。如果声明某个变量为volatile
,那么对该变量的任何修改,都会立即对整个程序可见。对volatile
变量的修改会立即写入主内存,并从主内存读出,不会使用本地缓存。
原子性和volatile
是不同的概念。对非volatile
的原子操作不一定会被写入主内存,故而不确保对其他任务的可见性。如果多个任务都要访问某个变量,则该变量应当是volatile
的;如果不使用volatile
,那么该变量应当仅能够以同步的方式访问。反过来说,如果一个变量仅能从同步的方法或代码块中访问,那就不需要声名其为volatile
。
在同一个任务中的修改会对该任务可见,如果变量仅在一个任务中用到,就不需要声名为volatile
。
“原子操作不需要同步”的观点是错误的。
import java.util.concurrent.*; public class AtomicityTest implements Runnable { private int i = 0; public int getValue() { return i; } private synchronized void evenIncrement() { i++; i++; } public void run() { while(true) evenIncrement(); } public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool(); AtomicityTest at = new AtomicityTest(); exec.execute(at); while(true) { int val = at.getValue(); if(val % 2 != 0) { System.out.println(val); System.exit(0); } } } } /* Output 11 */
在上面的例子中,自加方法evenIncrement()
是同步的;getValue()
中只是返回i
,返回操作是原子操作,但还是出现了奇数的情况。原因在于,getValue()
中对i
的读取缺乏同步,如果读取发生在i
的自加操作中间,就可能读取到处于奇数状态的i
。getValue()
和evenIncrement()
都需要同步。
下面的例子中,nextSerialNumber()
用于产生唯一的序列号:
public class SerialNumberGenerator { private static volatile int serialNumber = 0; public static int nextSerialNumber() { return serialNumber++; // Not thread-safe } }
在Java中,自增不是原子操作,它包含了读取和写入两种操作,serialNumber
通过volatile
保证了可见性,其他任务调用了nextSerialNumber()后,可以及时观测到serialNumber
的变化。但这里的问题是,nextSerialNumber()
没有使用同步的方式访问serialNumber
,所以其他任务可能会在serialNumber
的自增操作期间访问serialNumber
,导致读取到不确定的结果。volatile
并不能解决“自增操作不是原子操作、可能被打断”这一事实。
简单来说,如果一个变量会被多个任务同时访问,且至少有一个任务会修改该变量,就应该使用volatile
。举例来说,用于停止任务的标志位必须是volatile
,否则的话,该标志位可能被缓存在寄存器里,当其他任务修改了该标志位,缓存的值并不会改变,导致无法及时停止任务。
下面的例子为了测试SerialNumberGenerator
,使用了一个循环存储CircularSet
,避免内存不足:
import java.util.concurrent.*; class CircularSet { private int[] array; private int len; private int index = 0; public CircularSet(int size) { array = new int[size]; len = size; // Initialize to a value not produced // by the SerialNumberGenerator: for(int i = 0; i < size; i++) array[i] = -1; } public synchronized void add(int i) { array[index] = i; // Wrap index and write over old elements: index = ++index % len; } public synchronized boolean contains(int val) { for(int i = 0; i < len; i++) if(array[i] == val) return true; return false; } } public class SerialNumberChecker { private static final int SIZE = 10; private static CircularSet serials = new CircularSet(1000); private static ExecutorService exec = Executors.newCachedThreadPool(); static class SerialChecker implements Runnable { public void run() { while(true) { int serial = SerialNumberGenerator.nextSerialNumber(); if(serials.contains(serial)) { System.out.println("Duplicate: " + serial); System.exit(0); } serials.add(serial); } } } public static void main(String[] args) throws Exception { for(int i = 0; i < SIZE; i++) exec.execute(new SerialChecker()); // Stop after n seconds if there’s an argument: if(args.length > 0) { TimeUnit.SECONDS.sleep(new Integer(args[0])); System.out.println("No duplicates detected"); System.exit(0); } } } /* Output Duplicate: 3213 */
这里使用了10个任务并行地调用SerialNumberGenerator.nextSerialNumber()
,打印输出了Duplicate: 3213
,出现了重复的结果,验证了上面的分析。解决方法很简单,只需为SerialNumberGenerator
的nextSerialNumber()
方法加上synchronized
。
4. Atomic Classes
原子类(Atomic Class)是Java SE 5中引入的,提供原子性的操作。
import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.*; public class AtomicIntegerTest implements Runnable { private AtomicInteger i = new AtomicInteger(0); public int getValue() { return i.get(); } private void evenIncrement() { i.addAndGet(2); } public void run() { while(true) evenIncrement(); } public static void main(String[] args) { new Timer().schedule(new TimerTask() { public void run() { System.err.println("Aborting"); System.exit(0); } }, 5000); // Terminate after 5 seconds ExecutorService exec = Executors.newCachedThreadPool(); AtomicIntegerTest ait = new AtomicIntegerTest(); exec.execute(ait); while(true) { int val = ait.getValue(); if(val % 2 != 0) { System.out.println(val); System.exit(0); } } } }
这里使用了AtomicInteger
,不再需要使用synchronized
。上面的例子会在5秒后打印Aborting
并结束,不会检出奇数的情况。
另一个例子:
import java.util.concurrent.atomic.*; public class AtomicEvenGenerator extends IntGenerator { private AtomicInteger currentEvenValue = new AtomicInteger(0); public int next() { return currentEvenValue.addAndGet(2); } public static void main(String[] args) { EvenChecker.test(new AtomicEvenGenerator()); } }
同样也不会出现奇数的情况。
原子类被设计用于实现java.util.concurrent
中的类,只应当在特殊情况下使用。通常来说使用synchronized
或Lock
进行锁定更加安全。
5. Critical Sections
通过synchronized
关键字可以创建一块临界区(Critical Section),也称为同步块(Synchronized Block):
synchronized(syncObject) { // This code can be accessed by only one task at a time }
其中的代码同时只能被一个线程访问。由此可以限制多个线程同时访问方法中的一段代码,而不直接同步整个方法。在进入同步块前,必须要获得syncObject
的锁定,如果其他任务已经获取了锁定,那么在锁定释放前,其他任务就无法进入同步块。
同步块能为其他任务提供更多的访问机会,提高资源的使用效率:
import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.*; class Pair { // Not thread-safe private int x, y; public Pair(int x, int y) { this.x = x; this.y = y; } public Pair() { this(0, 0); } public int getX() { return x; } public int getY() { return y; } public void incrementX() { x++; } public void incrementY() { y++; } public String toString() { return "x: " + x + ", y: " + y; } public class PairValuesNotEqualException extends RuntimeException { public PairValuesNotEqualException() { super("Pair values not equal: " + Pair.this); } } // Arbitrary invariant -- both variables must be equal: public void checkState() { if(x != y) throw new PairValuesNotEqualException(); } } // Protect a Pair inside a thread-safe class: abstract class PairManager { AtomicInteger checkCounter = new AtomicInteger(0); protected Pair p = new Pair(); private List<Pair> storage = Collections.synchronizedList(new ArrayList<Pair>()); public synchronized Pair getPair() { // Make a copy to keep the original safe: return new Pair(p.getX(), p.getY()); } // Assume this is a time consuming operation protected void store(Pair p) { storage.add(p); try { TimeUnit.MILLISECONDS.sleep(50); } catch(InterruptedException ignore) {} } public abstract void increment(); } // Synchronize the entire method: class PairManager1 extends PairManager { public synchronized void increment() { p.incrementX(); p.incrementY(); store(getPair()); } } // Use a critical section: class PairManager2 extends PairManager { public void increment() { Pair temp; synchronized(this) { p.incrementX(); p.incrementY(); temp = getPair(); } store(temp); } } class PairManipulator implements Runnable { private PairManager pm; public PairManipulator(PairManager pm) { this.pm = pm; } public void run() { while(true) pm.increment(); } public String toString() { return "Pair: " + pm.getPair() + " checkCounter = " + pm.checkCounter.get(); } } class PairChecker implements Runnable { private PairManager pm; public PairChecker(PairManager pm) { this.pm = pm; } public void run() { while(true) { pm.checkCounter.incrementAndGet(); pm.getPair().checkState(); } } } public class CriticalSection { // Test the two different approaches: static void testApproaches(PairManager pman1, PairManager pman2) { ExecutorService exec = Executors.newCachedThreadPool(); PairManipulator pm1 = new PairManipulator(pman1), pm2 = new PairManipulator(pman2); PairChecker pcheck1 = new PairChecker(pman1), pcheck2 = new PairChecker(pman2); exec.execute(pm1); exec.execute(pm2); exec.execute(pcheck1); exec.execute(pcheck2); try { TimeUnit.MILLISECONDS.sleep(500); } catch(InterruptedException e) { System.out.println("Sleep interrupted"); } System.out.println("pm1: " + pm1 + "\npm2: " + pm2); System.exit(0); } public static void main(String[] args) { PairManager pman1 = new PairManager1(), pman2 = new PairManager2(); testApproaches(pman1, pman2); } } /* Output pm1: Pair: x: 55, y: 55 checkCounter = 3 pm2: Pair: x: 56, y: 56 checkCounter = 154288579 */
Pair
不是线程安全的,其中的自加操作不是原子的,存在被打断的情况,incrementX()
和incrementY()
也没有同步。这里使用PairManager
持有Pair
并外部对Pair
的访问,实现线程安全。PairManager
由两个具体子类,PairManager1
同步了整个increment()
方法,PairManager2
只同步increment()
方法中的一部分。PairManager
的store()
用于把Pair
保存到同步列表,不需要进行同步。PairChecker
在单独线程上不停地访问PairManager
,用checkCounter
记录成功访问的次数,由此测定PairManager
各具体子类处于解锁状态的时间在整个测试期间所占的比重。从打印可见,pm2
,也就是使用同步块的PairManager2
,占用并锁定共享资源的时间较少,可以让共享的资源在多数时间保持解锁、供其他任务使用。
Thinking in Java原书在这里还有一个比较同步整个方法和手动为代码块加锁的例子,但在我的电脑上运行会出现问题,详见混用同步块和同步方法时的问题。
6. Synchronizing On Other Objects
使用同步块时,必须指定要同步的对象。通常使用当前对象自身进行同步,即synchronized(this)
。但有时候也需要同步于其他对象,此时必须确保其他所有相关的任务都同步于同一个对象。
class DualSynch { private Object syncObject = new Object(); public synchronized void f() { for(int i = 0; i < 5; i++) { System.out.println("f()"); Thread.yield(); } } public void g() { synchronized(syncObject) { for(int i = 0; i < 5; i++) { System.out.println("g()"); Thread.yield(); } } } } public class SyncObject { public static void main(String[] args) { final DualSynch ds = new DualSynch(); new Thread() { public void run() { ds.f(); } }.start(); ds.g(); } } /* Output: (Sample) g() f() g() f() g() f() ... */
这里f()
同步于this
,g()
同步于syncObject
,二者之间的同步是独立的。
7. Thread Local Storage
使用线程本地存储(Thread Local Storage)也可以解决多任务使用共享资源发生冲突的问题。线程本地存储可以在不同的线程上为同一个变量创建副本,各线程只使用自己的副本。这使得线程具有了状态。
java.lang.ThreadLocal
用于创建线程存储:
import java.util.concurrent.*; import java.util.*; class Accessor implements Runnable { private final int id; public Accessor(int idn) { id = idn; } public void run() { while(!Thread.currentThread().isInterrupted()) { ThreadLocalVariableHolder.increment(); System.out.println(this); Thread.yield(); } } public String toString() { return "#" + id + ": " + ThreadLocalVariableHolder.get(); } } public class ThreadLocalVariableHolder { private static ThreadLocal<Integer> value = new ThreadLocal<Integer>() { private Random rand = new Random(47); protected synchronized Integer initialValue() { return rand.nextInt(10000); } }; public static void increment() { value.set(value.get() + 1); } public static int get() { return value.get(); } public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); for(int i = 0; i < 5; i++) exec.execute(new Accessor(i)); TimeUnit.MILLISECONDS.sleep(10); // Run for a while exec.shutdownNow(); // All Accessors will quit } } /* Output ... #2: 6776 #4: 1010 #1: 601 #0: 9300 #3: 1901
ThreadLocalVariableHolder
的ThreadLocal<Integer> value
具有随机的初始值,由打印可见,每个线程都有自己的ThreadLocal
副本。ThreadLocal
对象通常使用static
。ThreadLocal
对象只能通过get()
和set()
访问,get()
会返回对应线程上的副本,set()
则用于更新对应线程上的副本,并返回旧值。注意ThreadLocalVariableHolder
的increment()
和get()
没有同步,因为ThreadLocal
本身可以确保不会发生竞争。