Concurrency: Sharing Resources (2)

3 Atomicity and Volatility

  原子操作(Atomic Operation)指的是不会被线程调度打断的操作。对除了longdouble以外基本类型的简单操作(如赋值和返回值)是原子性的。JVM允许以两次32位操作的形式读写64位的值(longdouble型变量),导致读写期间可能发生上下文切换。为了使longdouble得到原子性,可以使用volatile来修饰longdouble型变量。

  凭借原子操作不会被线程调度打断的特性,专家可以写出没有锁的代码,不需要使用同步机制。但不要轻易尝试。移除同步通常意味着不成熟的优化,通常会带来更多问题。

  在多处理器系统上,可见性(Visibility)通常比原子性(Atomicity)更容易引发问题。在一个任务中进行的修改,即便使用的是原子操作、不会被打断,这个修改在另一个任务中也可能是不可见的,因为修改可能被暂存在处理器的缓存中,对其他任务不可见。那么问题来了:不同任务对当前应用程序的状态有不同的观测。同步机制会强制一个任务的修改可以在整个应用程序中可见;没有同步机制,一个任务的修改何时能对其他任务可见是不确定的。

  volatile关键字也可以确保可见性。如果声明某个变量为volatile,那么对该变量的任何修改,都会立即对整个程序可见。对volatile变量的修改会立即写入主内存,并从主内存读出,不会使用本地缓存。

  原子性和volatile是不同的概念。对非volatile的原子操作不一定会被写入主内存,故而不确保对其他任务的可见性。如果多个任务都要访问某个变量,则该变量应当是volatile的;如果不使用volatile,那么该变量应当仅能够以同步的方式访问。反过来说,如果一个变量仅能从同步的方法或代码块中访问,那就不需要声名其为volatile

  在同一个任务中的修改会对该任务可见,如果变量仅在一个任务中用到,就不需要声名为volatile

  “原子操作不需要同步”的观点是错误的。

import java.util.concurrent.*;

public class AtomicityTest implements Runnable {
    private int i = 0;
    public int getValue() { return i; }
    private synchronized void evenIncrement() { i++; i++; }
    public void run() {
        while(true)
            evenIncrement();
    }
    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();
        AtomicityTest at = new AtomicityTest();
        exec.execute(at);
        while(true) {
            int val = at.getValue();
            if(val % 2 != 0) {
                System.out.println(val);
                System.exit(0);
            }
        }
    }
}
/* Output
11
*/

在上面的例子中,自加方法evenIncrement()是同步的;getValue()中只是返回i,返回操作是原子操作,但还是出现了奇数的情况。原因在于,getValue()中对i的读取缺乏同步,如果读取发生在i的自加操作中间,就可能读取到处于奇数状态的igetValue()evenIncrement()都需要同步。

  下面的例子中,nextSerialNumber()用于产生唯一的序列号:

public class SerialNumberGenerator {
    private static volatile int serialNumber = 0;
    public static int nextSerialNumber() {
        return serialNumber++; // Not thread-safe
    }
}

在Java中,自增不是原子操作,它包含了读取和写入两种操作,serialNumber通过volatile保证了可见性,其他任务调用了nextSerialNumber()后,可以及时观测到serialNumber的变化。但这里的问题是,nextSerialNumber()没有使用同步的方式访问serialNumber,所以其他任务可能会在serialNumber的自增操作期间访问serialNumber,导致读取到不确定的结果。volatile并不能解决“自增操作不是原子操作、可能被打断”这一事实。

  简单来说,如果一个变量会被多个任务同时访问,且至少有一个任务会修改该变量,就应该使用volatile。举例来说,用于停止任务的标志位必须是volatile,否则的话,该标志位可能被缓存在寄存器里,当其他任务修改了该标志位,缓存的值并不会改变,导致无法及时停止任务。

  下面的例子为了测试SerialNumberGenerator,使用了一个循环存储CircularSet,避免内存不足:

import java.util.concurrent.*;

class CircularSet {
    private int[] array;
    private int len;
    private int index = 0;
    public CircularSet(int size) {
        array = new int[size];
        len = size;
        // Initialize to a value not produced
        // by the SerialNumberGenerator:
        for(int i = 0; i < size; i++)
            array[i] = -1;
    }
    public synchronized void add(int i) {
        array[index] = i;
        // Wrap index and write over old elements:
        index = ++index % len;
    }
    public synchronized boolean contains(int val) {
        for(int i = 0; i < len; i++)
            if(array[i] == val) return true;
        return false;
    }
}

public class SerialNumberChecker {
    private static final int SIZE = 10;
    private static CircularSet serials =
            new CircularSet(1000);
    private static ExecutorService exec =
            Executors.newCachedThreadPool();
    static class SerialChecker implements Runnable {
        public void run() {
            while(true) {
                int serial =
                        SerialNumberGenerator.nextSerialNumber();
                if(serials.contains(serial)) {
                    System.out.println("Duplicate: " + serial);
                    System.exit(0);
                }
                serials.add(serial);
            }
        }
    }
    public static void main(String[] args) throws Exception {
        for(int i = 0; i < SIZE; i++)
            exec.execute(new SerialChecker());
        // Stop after n seconds if there’s an argument:
        if(args.length > 0) {
            TimeUnit.SECONDS.sleep(new Integer(args[0]));
            System.out.println("No duplicates detected");
            System.exit(0);
        }
    }
}
/* Output
Duplicate: 3213
*/

这里使用了10个任务并行地调用SerialNumberGenerator.nextSerialNumber(),打印输出了Duplicate: 3213,出现了重复的结果,验证了上面的分析。解决方法很简单,只需为SerialNumberGeneratornextSerialNumber()方法加上synchronized

4. Atomic Classes

  原子类(Atomic Class)是Java SE 5中引入的,提供原子性的操作。

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.*;

public class AtomicIntegerTest implements Runnable {
    private AtomicInteger i = new AtomicInteger(0);
    public int getValue() { return i.get(); }
    private void evenIncrement() { i.addAndGet(2); }
    public void run() {
        while(true)
            evenIncrement();
    }
    public static void main(String[] args) {
        new Timer().schedule(new TimerTask() {
            public void run() {
                System.err.println("Aborting");
                System.exit(0);
            }
        }, 5000); // Terminate after 5 seconds 
        ExecutorService exec = Executors.newCachedThreadPool();
        AtomicIntegerTest ait = new AtomicIntegerTest();
        exec.execute(ait);
        while(true) {
            int val = ait.getValue();
            if(val % 2 != 0) {
                System.out.println(val);
                System.exit(0);
            }
        }
    }
}

这里使用了AtomicInteger,不再需要使用synchronized。上面的例子会在5秒后打印Aborting并结束,不会检出奇数的情况。

  另一个例子:

import java.util.concurrent.atomic.*;

public class AtomicEvenGenerator extends IntGenerator {
    private AtomicInteger currentEvenValue = new AtomicInteger(0);
    public int next() {
        return currentEvenValue.addAndGet(2);
    }
    public static void main(String[] args) {
        EvenChecker.test(new AtomicEvenGenerator());
    }
}

同样也不会出现奇数的情况。

  原子类被设计用于实现java.util.concurrent中的类,只应当在特殊情况下使用。通常来说使用synchronizedLock进行锁定更加安全。

5. Critical Sections

  通过synchronized关键字可以创建一块临界区(Critical Section),也称为同步块(Synchronized Block):

synchronized(syncObject) { 
    // This code can be accessed by only one task at a time 
}

其中的代码同时只能被一个线程访问。由此可以限制多个线程同时访问方法中的一段代码,而不直接同步整个方法。在进入同步块前,必须要获得syncObject的锁定,如果其他任务已经获取了锁定,那么在锁定释放前,其他任务就无法进入同步块。

  同步块能为其他任务提供更多的访问机会,提高资源的使用效率:

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.*;

class Pair { // Not thread-safe
    private int x, y;
    public Pair(int x, int y) {
        this.x = x;
        this.y = y;
    }
    public Pair() { this(0, 0); }
    public int getX() { return x; }
    public int getY() { return y; }
    public void incrementX() { x++; }
    public void incrementY() { y++; }
    public String toString() {
        return "x: " + x + ", y: " + y;
    }
    public class PairValuesNotEqualException extends RuntimeException {
        public PairValuesNotEqualException() {
            super("Pair values not equal: " + Pair.this);
        }
    }
    // Arbitrary invariant -- both variables must be equal:
    public void checkState() {
        if(x != y)
            throw new PairValuesNotEqualException();
    }
}

// Protect a Pair inside a thread-safe class:
abstract class PairManager {
    AtomicInteger checkCounter = new AtomicInteger(0);
    protected Pair p = new Pair();
    private List<Pair> storage = Collections.synchronizedList(new ArrayList<Pair>());
    public synchronized Pair getPair() {
        // Make a copy to keep the original safe:
        return new Pair(p.getX(), p.getY());
    }
    // Assume this is a time consuming operation
    protected void store(Pair p) {
        storage.add(p);
        try {
            TimeUnit.MILLISECONDS.sleep(50);
        } catch(InterruptedException ignore) {}
    }
    public abstract void increment();
}

// Synchronize the entire method:
class PairManager1 extends PairManager {
    public synchronized void increment() {
        p.incrementX();
        p.incrementY();
        store(getPair());
    }
}

// Use a critical section:
class PairManager2 extends PairManager {
    public void increment() {
        Pair temp;
        synchronized(this) {
            p.incrementX();
            p.incrementY();
            temp = getPair();
        }
        store(temp);
    }
}

class PairManipulator implements Runnable {
    private PairManager pm;
    public PairManipulator(PairManager pm) {
        this.pm = pm;
    }
    public void run() {
        while(true)
            pm.increment();
    }
    public String toString() {
        return "Pair: " + pm.getPair() + " checkCounter = " + pm.checkCounter.get();
    }
}

class PairChecker implements Runnable {
    private PairManager pm;
    public PairChecker(PairManager pm) {
        this.pm = pm;
    }
    public void run() {
        while(true) {
            pm.checkCounter.incrementAndGet();
            pm.getPair().checkState();
        }
    }
}

public class CriticalSection {
    // Test the two different approaches:
    static void
    testApproaches(PairManager pman1, PairManager pman2) {
        ExecutorService exec = Executors.newCachedThreadPool();
        PairManipulator pm1 = new PairManipulator(pman1), 
                pm2 = new PairManipulator(pman2);
        PairChecker pcheck1 = new PairChecker(pman1), 
                pcheck2 = new PairChecker(pman2);
        exec.execute(pm1);
        exec.execute(pm2);
        exec.execute(pcheck1);
        exec.execute(pcheck2);
        try {
            TimeUnit.MILLISECONDS.sleep(500);
        } catch(InterruptedException e) {
            System.out.println("Sleep interrupted");
        }
        System.out.println("pm1: " + pm1 + "\npm2: " + pm2);
        System.exit(0);
    }
    public static void main(String[] args) {
        PairManager pman1 = new PairManager1(), 
                pman2 = new PairManager2();
        testApproaches(pman1, pman2);
    }
}
/* Output
pm1: Pair: x: 55, y: 55  checkCounter = 3
pm2: Pair: x: 56, y: 56 checkCounter = 154288579
*/

Pair不是线程安全的,其中的自加操作不是原子的,存在被打断的情况,incrementX()incrementY()也没有同步。这里使用PairManager持有Pair并外部对Pair的访问,实现线程安全。PairManager由两个具体子类,PairManager1同步了整个increment()方法,PairManager2只同步increment()方法中的一部分。PairManagerstore()用于把Pair保存到同步列表,不需要进行同步。PairChecker在单独线程上不停地访问PairManager,用checkCounter记录成功访问的次数,由此测定PairManager各具体子类处于解锁状态的时间在整个测试期间所占的比重。从打印可见,pm2,也就是使用同步块的PairManager2,占用并锁定共享资源的时间较少,可以让共享的资源在多数时间保持解锁、供其他任务使用。

  Thinking in Java原书在这里还有一个比较同步整个方法和手动为代码块加锁的例子,但在我的电脑上运行会出现问题,详见混用同步块和同步方法时的问题

6. Synchronizing On Other Objects

  使用同步块时,必须指定要同步的对象。通常使用当前对象自身进行同步,即synchronized(this)。但有时候也需要同步于其他对象,此时必须确保其他所有相关的任务都同步于同一个对象。

class DualSynch {
    private Object syncObject = new Object();
    public synchronized void f() {
        for(int i = 0; i < 5; i++) {
            System.out.println("f()");
            Thread.yield();
        }
    }
    public void g() {
        synchronized(syncObject) {
            for(int i = 0; i < 5; i++) {
                System.out.println("g()");
                Thread.yield();
            }
        }
    }
}

public class SyncObject {
    public static void main(String[] args) {
        final DualSynch ds = new DualSynch();
        new Thread() {
            public void run() {
                ds.f();
            }
        }.start();
        ds.g();
    }
}
/* Output: (Sample)
g()
f()
g()
f()
g()
f()
...
*/

这里f()同步于thisg()同步于syncObject,二者之间的同步是独立的。

7. Thread Local Storage

  使用线程本地存储(Thread Local Storage)也可以解决多任务使用共享资源发生冲突的问题。线程本地存储可以在不同的线程上为同一个变量创建副本,各线程只使用自己的副本。这使得线程具有了状态。
  java.lang.ThreadLocal用于创建线程存储:

import java.util.concurrent.*;
import java.util.*;

class Accessor implements Runnable {
    private final int id;
    public Accessor(int idn) { id = idn; }
    public void run() {
        while(!Thread.currentThread().isInterrupted()) {
            ThreadLocalVariableHolder.increment();
            System.out.println(this);
            Thread.yield();
        }
    }
    public String toString() {
        return "#" + id + ": " +
                ThreadLocalVariableHolder.get();
    }
}

public class ThreadLocalVariableHolder {
    private static ThreadLocal<Integer> value =
            new ThreadLocal<Integer>() {
                private Random rand = new Random(47);
                protected synchronized Integer initialValue() {
                    return rand.nextInt(10000);
                }
            };
    public static void increment() {
        value.set(value.get() + 1);
    }
    public static int get() { return value.get(); }
    public static void main(String[] args) throws Exception {
        ExecutorService exec = Executors.newCachedThreadPool();
        for(int i = 0; i < 5; i++)
            exec.execute(new Accessor(i));
        TimeUnit.MILLISECONDS.sleep(10);  // Run for a while
        exec.shutdownNow();         // All Accessors will quit
    }
}
/* Output
...
#2: 6776
#4: 1010
#1: 601
#0: 9300
#3: 1901

ThreadLocalVariableHolderThreadLocal<Integer> value具有随机的初始值,由打印可见,每个线程都有自己的ThreadLocal副本。ThreadLocal对象通常使用staticThreadLocal对象只能通过get()set()访问,get()会返回对应线程上的副本,set()则用于更新对应线程上的副本,并返回旧值。注意ThreadLocalVariableHolderincrement()get()没有同步,因为ThreadLocal本身可以确保不会发生竞争。