Concurrency: Sharing Resources (2)

3 Atomicity and Volatility

  原子操作(Atomic Operation)指的是不会被线程调度打断的操作。对除了longdouble以外基本类型的简单操作(如赋值和返回值)是原子性的。JVM允许以两次32位操作的形式读写64位的值(longdouble型变量),导致读写期间可能发生上下文切换。为了使longdouble得到原子性,可以使用volatile来修饰longdouble型变量。

  凭借原子操作不会被线程调度打断的特性,专家可以写出没有锁的代码,不需要使用同步机制。但不要轻易尝试。移除同步通常意味着不成熟的优化,通常会带来更多问题。

  在多处理器系统上,可见性(Visibility)通常比原子性(Atomicity)更容易引发问题。在一个任务中进行的修改,即便使用的是原子操作、不会被打断,这个修改在另一个任务中也可能是不可见的,因为修改可能被暂存在处理器的缓存中,对其他任务不可见。那么问题来了:不同任务对当前应用程序的状态有不同的观测。同步机制会强制一个任务的修改可以在整个应用程序中可见;没有同步机制,一个任务的修改何时能对其他任务可见是不确定的。

  volatile关键字也可以确保可见性。如果声明某个变量为volatile,那么对该变量的任何修改,都会立即对整个程序可见。对volatile变量的修改会立即写入主内存,并从主内存读出,不会使用本地缓存。

  原子性和volatile是不同的概念。对非volatile的原子操作不一定会被写入主内存,故而不确保对其他任务的可见性。如果多个任务都要访问某个变量,则该变量应当是volatile的;如果不使用volatile,那么该变量应当仅能够以同步的方式访问。反过来说,如果一个变量仅能从同步的方法或代码块中访问,那就不需要声名其为volatile

  在同一个任务中的修改会对该任务可见,如果变量仅在一个任务中用到,就不需要声名为volatile

  “原子操作不需要同步”的观点是错误的。

import java.util.concurrent.*;
public class AtomicityTest implements Runnable {
private int i = 0;
public int getValue() { return i; }
private synchronized void evenIncrement() { i++; i++; }
public void run() {
while(true)
evenIncrement();
}
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
AtomicityTest at = new AtomicityTest();
exec.execute(at);
while(true) {
int val = at.getValue();
if(val % 2 != 0) {
System.out.println(val);
System.exit(0);
}
}
}
}
/* Output
11
*/
import java.util.concurrent.*; public class AtomicityTest implements Runnable { private int i = 0; public int getValue() { return i; } private synchronized void evenIncrement() { i++; i++; } public void run() { while(true) evenIncrement(); } public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool(); AtomicityTest at = new AtomicityTest(); exec.execute(at); while(true) { int val = at.getValue(); if(val % 2 != 0) { System.out.println(val); System.exit(0); } } } } /* Output 11 */
import java.util.concurrent.*;

public class AtomicityTest implements Runnable {
    private int i = 0;
    public int getValue() { return i; }
    private synchronized void evenIncrement() { i++; i++; }
    public void run() {
        while(true)
            evenIncrement();
    }
    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();
        AtomicityTest at = new AtomicityTest();
        exec.execute(at);
        while(true) {
            int val = at.getValue();
            if(val % 2 != 0) {
                System.out.println(val);
                System.exit(0);
            }
        }
    }
}
/* Output
11
*/

在上面的例子中,自加方法evenIncrement()是同步的;getValue()中只是返回i,返回操作是原子操作,但还是出现了奇数的情况。原因在于,getValue()中对i的读取缺乏同步,如果读取发生在i的自加操作中间,就可能读取到处于奇数状态的igetValue()evenIncrement()都需要同步。

  下面的例子中,nextSerialNumber()用于产生唯一的序列号:

public class SerialNumberGenerator {
private static volatile int serialNumber = 0;
public static int nextSerialNumber() {
return serialNumber++; // Not thread-safe
}
}
public class SerialNumberGenerator { private static volatile int serialNumber = 0; public static int nextSerialNumber() { return serialNumber++; // Not thread-safe } }
public class SerialNumberGenerator {
    private static volatile int serialNumber = 0;
    public static int nextSerialNumber() {
        return serialNumber++; // Not thread-safe
    }
}

在Java中,自增不是原子操作,它包含了读取和写入两种操作,serialNumber通过volatile保证了可见性,其他任务调用了nextSerialNumber()后,可以及时观测到serialNumber的变化。但这里的问题是,nextSerialNumber()没有使用同步的方式访问serialNumber,所以其他任务可能会在serialNumber的自增操作期间访问serialNumber,导致读取到不确定的结果。volatile并不能解决“自增操作不是原子操作、可能被打断”这一事实。

  简单来说,如果一个变量会被多个任务同时访问,且至少有一个任务会修改该变量,就应该使用volatile。举例来说,用于停止任务的标志位必须是volatile,否则的话,该标志位可能被缓存在寄存器里,当其他任务修改了该标志位,缓存的值并不会改变,导致无法及时停止任务。

  下面的例子为了测试SerialNumberGenerator,使用了一个循环存储CircularSet,避免内存不足:

import java.util.concurrent.*;
class CircularSet {
private int[] array;
private int len;
private int index = 0;
public CircularSet(int size) {
array = new int[size];
len = size;
// Initialize to a value not produced
// by the SerialNumberGenerator:
for(int i = 0; i < size; i++)
array[i] = -1;
}
public synchronized void add(int i) {
array[index] = i;
// Wrap index and write over old elements:
index = ++index % len;
}
public synchronized boolean contains(int val) {
for(int i = 0; i < len; i++)
if(array[i] == val) return true;
return false;
}
}
public class SerialNumberChecker {
private static final int SIZE = 10;
private static CircularSet serials =
new CircularSet(1000);
private static ExecutorService exec =
Executors.newCachedThreadPool();
static class SerialChecker implements Runnable {
public void run() {
while(true) {
int serial =
SerialNumberGenerator.nextSerialNumber();
if(serials.contains(serial)) {
System.out.println("Duplicate: " + serial);
System.exit(0);
}
serials.add(serial);
}
}
}
public static void main(String[] args) throws Exception {
for(int i = 0; i < SIZE; i++)
exec.execute(new SerialChecker());
// Stop after n seconds if there’s an argument:
if(args.length > 0) {
TimeUnit.SECONDS.sleep(new Integer(args[0]));
System.out.println("No duplicates detected");
System.exit(0);
}
}
}
/* Output
Duplicate: 3213
*/
import java.util.concurrent.*; class CircularSet { private int[] array; private int len; private int index = 0; public CircularSet(int size) { array = new int[size]; len = size; // Initialize to a value not produced // by the SerialNumberGenerator: for(int i = 0; i < size; i++) array[i] = -1; } public synchronized void add(int i) { array[index] = i; // Wrap index and write over old elements: index = ++index % len; } public synchronized boolean contains(int val) { for(int i = 0; i < len; i++) if(array[i] == val) return true; return false; } } public class SerialNumberChecker { private static final int SIZE = 10; private static CircularSet serials = new CircularSet(1000); private static ExecutorService exec = Executors.newCachedThreadPool(); static class SerialChecker implements Runnable { public void run() { while(true) { int serial = SerialNumberGenerator.nextSerialNumber(); if(serials.contains(serial)) { System.out.println("Duplicate: " + serial); System.exit(0); } serials.add(serial); } } } public static void main(String[] args) throws Exception { for(int i = 0; i < SIZE; i++) exec.execute(new SerialChecker()); // Stop after n seconds if there’s an argument: if(args.length > 0) { TimeUnit.SECONDS.sleep(new Integer(args[0])); System.out.println("No duplicates detected"); System.exit(0); } } } /* Output Duplicate: 3213 */
import java.util.concurrent.*;

class CircularSet {
    private int[] array;
    private int len;
    private int index = 0;
    public CircularSet(int size) {
        array = new int[size];
        len = size;
        // Initialize to a value not produced
        // by the SerialNumberGenerator:
        for(int i = 0; i < size; i++)
            array[i] = -1;
    }
    public synchronized void add(int i) {
        array[index] = i;
        // Wrap index and write over old elements:
        index = ++index % len;
    }
    public synchronized boolean contains(int val) {
        for(int i = 0; i < len; i++)
            if(array[i] == val) return true;
        return false;
    }
}

public class SerialNumberChecker {
    private static final int SIZE = 10;
    private static CircularSet serials =
            new CircularSet(1000);
    private static ExecutorService exec =
            Executors.newCachedThreadPool();
    static class SerialChecker implements Runnable {
        public void run() {
            while(true) {
                int serial =
                        SerialNumberGenerator.nextSerialNumber();
                if(serials.contains(serial)) {
                    System.out.println("Duplicate: " + serial);
                    System.exit(0);
                }
                serials.add(serial);
            }
        }
    }
    public static void main(String[] args) throws Exception {
        for(int i = 0; i < SIZE; i++)
            exec.execute(new SerialChecker());
        // Stop after n seconds if there’s an argument:
        if(args.length > 0) {
            TimeUnit.SECONDS.sleep(new Integer(args[0]));
            System.out.println("No duplicates detected");
            System.exit(0);
        }
    }
}
/* Output
Duplicate: 3213
*/

这里使用了10个任务并行地调用SerialNumberGenerator.nextSerialNumber(),打印输出了Duplicate: 3213,出现了重复的结果,验证了上面的分析。解决方法很简单,只需为SerialNumberGeneratornextSerialNumber()方法加上synchronized

4. Atomic Classes

  原子类(Atomic Class)是Java SE 5中引入的,提供原子性的操作。

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.*;
public class AtomicIntegerTest implements Runnable {
private AtomicInteger i = new AtomicInteger(0);
public int getValue() { return i.get(); }
private void evenIncrement() { i.addAndGet(2); }
public void run() {
while(true)
evenIncrement();
}
public static void main(String[] args) {
new Timer().schedule(new TimerTask() {
public void run() {
System.err.println("Aborting");
System.exit(0);
}
}, 5000); // Terminate after 5 seconds
ExecutorService exec = Executors.newCachedThreadPool();
AtomicIntegerTest ait = new AtomicIntegerTest();
exec.execute(ait);
while(true) {
int val = ait.getValue();
if(val % 2 != 0) {
System.out.println(val);
System.exit(0);
}
}
}
}
import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.*; public class AtomicIntegerTest implements Runnable { private AtomicInteger i = new AtomicInteger(0); public int getValue() { return i.get(); } private void evenIncrement() { i.addAndGet(2); } public void run() { while(true) evenIncrement(); } public static void main(String[] args) { new Timer().schedule(new TimerTask() { public void run() { System.err.println("Aborting"); System.exit(0); } }, 5000); // Terminate after 5 seconds ExecutorService exec = Executors.newCachedThreadPool(); AtomicIntegerTest ait = new AtomicIntegerTest(); exec.execute(ait); while(true) { int val = ait.getValue(); if(val % 2 != 0) { System.out.println(val); System.exit(0); } } } }
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.*;

public class AtomicIntegerTest implements Runnable {
    private AtomicInteger i = new AtomicInteger(0);
    public int getValue() { return i.get(); }
    private void evenIncrement() { i.addAndGet(2); }
    public void run() {
        while(true)
            evenIncrement();
    }
    public static void main(String[] args) {
        new Timer().schedule(new TimerTask() {
            public void run() {
                System.err.println("Aborting");
                System.exit(0);
            }
        }, 5000); // Terminate after 5 seconds 
        ExecutorService exec = Executors.newCachedThreadPool();
        AtomicIntegerTest ait = new AtomicIntegerTest();
        exec.execute(ait);
        while(true) {
            int val = ait.getValue();
            if(val % 2 != 0) {
                System.out.println(val);
                System.exit(0);
            }
        }
    }
}

这里使用了AtomicInteger,不再需要使用synchronized。上面的例子会在5秒后打印Aborting并结束,不会检出奇数的情况。

  另一个例子:

import java.util.concurrent.atomic.*;
public class AtomicEvenGenerator extends IntGenerator {
private AtomicInteger currentEvenValue = new AtomicInteger(0);
public int next() {
return currentEvenValue.addAndGet(2);
}
public static void main(String[] args) {
EvenChecker.test(new AtomicEvenGenerator());
}
}
import java.util.concurrent.atomic.*; public class AtomicEvenGenerator extends IntGenerator { private AtomicInteger currentEvenValue = new AtomicInteger(0); public int next() { return currentEvenValue.addAndGet(2); } public static void main(String[] args) { EvenChecker.test(new AtomicEvenGenerator()); } }
import java.util.concurrent.atomic.*;

public class AtomicEvenGenerator extends IntGenerator {
    private AtomicInteger currentEvenValue = new AtomicInteger(0);
    public int next() {
        return currentEvenValue.addAndGet(2);
    }
    public static void main(String[] args) {
        EvenChecker.test(new AtomicEvenGenerator());
    }
}

同样也不会出现奇数的情况。

  原子类被设计用于实现java.util.concurrent中的类,只应当在特殊情况下使用。通常来说使用synchronizedLock进行锁定更加安全。

5. Critical Sections

  通过synchronized关键字可以创建一块临界区(Critical Section),也称为同步块(Synchronized Block):

synchronized(syncObject) {
// This code can be accessed by only one task at a time
}
synchronized(syncObject) { // This code can be accessed by only one task at a time }
synchronized(syncObject) { 
    // This code can be accessed by only one task at a time 
}

其中的代码同时只能被一个线程访问。由此可以限制多个线程同时访问方法中的一段代码,而不直接同步整个方法。在进入同步块前,必须要获得syncObject的锁定,如果其他任务已经获取了锁定,那么在锁定释放前,其他任务就无法进入同步块。

  同步块能为其他任务提供更多的访问机会,提高资源的使用效率:

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.*;
class Pair { // Not thread-safe
private int x, y;
public Pair(int x, int y) {
this.x = x;
this.y = y;
}
public Pair() { this(0, 0); }
public int getX() { return x; }
public int getY() { return y; }
public void incrementX() { x++; }
public void incrementY() { y++; }
public String toString() {
return "x: " + x + ", y: " + y;
}
public class PairValuesNotEqualException extends RuntimeException {
public PairValuesNotEqualException() {
super("Pair values not equal: " + Pair.this);
}
}
// Arbitrary invariant -- both variables must be equal:
public void checkState() {
if(x != y)
throw new PairValuesNotEqualException();
}
}
// Protect a Pair inside a thread-safe class:
abstract class PairManager {
AtomicInteger checkCounter = new AtomicInteger(0);
protected Pair p = new Pair();
private List<Pair> storage = Collections.synchronizedList(new ArrayList<Pair>());
public synchronized Pair getPair() {
// Make a copy to keep the original safe:
return new Pair(p.getX(), p.getY());
}
// Assume this is a time consuming operation
protected void store(Pair p) {
storage.add(p);
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch(InterruptedException ignore) {}
}
public abstract void increment();
}
// Synchronize the entire method:
class PairManager1 extends PairManager {
public synchronized void increment() {
p.incrementX();
p.incrementY();
store(getPair());
}
}
// Use a critical section:
class PairManager2 extends PairManager {
public void increment() {
Pair temp;
synchronized(this) {
p.incrementX();
p.incrementY();
temp = getPair();
}
store(temp);
}
}
class PairManipulator implements Runnable {
private PairManager pm;
public PairManipulator(PairManager pm) {
this.pm = pm;
}
public void run() {
while(true)
pm.increment();
}
public String toString() {
return "Pair: " + pm.getPair() + " checkCounter = " + pm.checkCounter.get();
}
}
class PairChecker implements Runnable {
private PairManager pm;
public PairChecker(PairManager pm) {
this.pm = pm;
}
public void run() {
while(true) {
pm.checkCounter.incrementAndGet();
pm.getPair().checkState();
}
}
}
public class CriticalSection {
// Test the two different approaches:
static void
testApproaches(PairManager pman1, PairManager pman2) {
ExecutorService exec = Executors.newCachedThreadPool();
PairManipulator pm1 = new PairManipulator(pman1),
pm2 = new PairManipulator(pman2);
PairChecker pcheck1 = new PairChecker(pman1),
pcheck2 = new PairChecker(pman2);
exec.execute(pm1);
exec.execute(pm2);
exec.execute(pcheck1);
exec.execute(pcheck2);
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch(InterruptedException e) {
System.out.println("Sleep interrupted");
}
System.out.println("pm1: " + pm1 + "\npm2: " + pm2);
System.exit(0);
}
public static void main(String[] args) {
PairManager pman1 = new PairManager1(),
pman2 = new PairManager2();
testApproaches(pman1, pman2);
}
}
/* Output
pm1: Pair: x: 55, y: 55 checkCounter = 3
pm2: Pair: x: 56, y: 56 checkCounter = 154288579
*/
import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.*; class Pair { // Not thread-safe private int x, y; public Pair(int x, int y) { this.x = x; this.y = y; } public Pair() { this(0, 0); } public int getX() { return x; } public int getY() { return y; } public void incrementX() { x++; } public void incrementY() { y++; } public String toString() { return "x: " + x + ", y: " + y; } public class PairValuesNotEqualException extends RuntimeException { public PairValuesNotEqualException() { super("Pair values not equal: " + Pair.this); } } // Arbitrary invariant -- both variables must be equal: public void checkState() { if(x != y) throw new PairValuesNotEqualException(); } } // Protect a Pair inside a thread-safe class: abstract class PairManager { AtomicInteger checkCounter = new AtomicInteger(0); protected Pair p = new Pair(); private List<Pair> storage = Collections.synchronizedList(new ArrayList<Pair>()); public synchronized Pair getPair() { // Make a copy to keep the original safe: return new Pair(p.getX(), p.getY()); } // Assume this is a time consuming operation protected void store(Pair p) { storage.add(p); try { TimeUnit.MILLISECONDS.sleep(50); } catch(InterruptedException ignore) {} } public abstract void increment(); } // Synchronize the entire method: class PairManager1 extends PairManager { public synchronized void increment() { p.incrementX(); p.incrementY(); store(getPair()); } } // Use a critical section: class PairManager2 extends PairManager { public void increment() { Pair temp; synchronized(this) { p.incrementX(); p.incrementY(); temp = getPair(); } store(temp); } } class PairManipulator implements Runnable { private PairManager pm; public PairManipulator(PairManager pm) { this.pm = pm; } public void run() { while(true) pm.increment(); } public String toString() { return "Pair: " + pm.getPair() + " checkCounter = " + pm.checkCounter.get(); } } class PairChecker implements Runnable { private PairManager pm; public PairChecker(PairManager pm) { this.pm = pm; } public void run() { while(true) { pm.checkCounter.incrementAndGet(); pm.getPair().checkState(); } } } public class CriticalSection { // Test the two different approaches: static void testApproaches(PairManager pman1, PairManager pman2) { ExecutorService exec = Executors.newCachedThreadPool(); PairManipulator pm1 = new PairManipulator(pman1), pm2 = new PairManipulator(pman2); PairChecker pcheck1 = new PairChecker(pman1), pcheck2 = new PairChecker(pman2); exec.execute(pm1); exec.execute(pm2); exec.execute(pcheck1); exec.execute(pcheck2); try { TimeUnit.MILLISECONDS.sleep(500); } catch(InterruptedException e) { System.out.println("Sleep interrupted"); } System.out.println("pm1: " + pm1 + "\npm2: " + pm2); System.exit(0); } public static void main(String[] args) { PairManager pman1 = new PairManager1(), pman2 = new PairManager2(); testApproaches(pman1, pman2); } } /* Output pm1: Pair: x: 55, y: 55 checkCounter = 3 pm2: Pair: x: 56, y: 56 checkCounter = 154288579 */
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.*;

class Pair { // Not thread-safe
    private int x, y;
    public Pair(int x, int y) {
        this.x = x;
        this.y = y;
    }
    public Pair() { this(0, 0); }
    public int getX() { return x; }
    public int getY() { return y; }
    public void incrementX() { x++; }
    public void incrementY() { y++; }
    public String toString() {
        return "x: " + x + ", y: " + y;
    }
    public class PairValuesNotEqualException extends RuntimeException {
        public PairValuesNotEqualException() {
            super("Pair values not equal: " + Pair.this);
        }
    }
    // Arbitrary invariant -- both variables must be equal:
    public void checkState() {
        if(x != y)
            throw new PairValuesNotEqualException();
    }
}

// Protect a Pair inside a thread-safe class:
abstract class PairManager {
    AtomicInteger checkCounter = new AtomicInteger(0);
    protected Pair p = new Pair();
    private List<Pair> storage = Collections.synchronizedList(new ArrayList<Pair>());
    public synchronized Pair getPair() {
        // Make a copy to keep the original safe:
        return new Pair(p.getX(), p.getY());
    }
    // Assume this is a time consuming operation
    protected void store(Pair p) {
        storage.add(p);
        try {
            TimeUnit.MILLISECONDS.sleep(50);
        } catch(InterruptedException ignore) {}
    }
    public abstract void increment();
}

// Synchronize the entire method:
class PairManager1 extends PairManager {
    public synchronized void increment() {
        p.incrementX();
        p.incrementY();
        store(getPair());
    }
}

// Use a critical section:
class PairManager2 extends PairManager {
    public void increment() {
        Pair temp;
        synchronized(this) {
            p.incrementX();
            p.incrementY();
            temp = getPair();
        }
        store(temp);
    }
}

class PairManipulator implements Runnable {
    private PairManager pm;
    public PairManipulator(PairManager pm) {
        this.pm = pm;
    }
    public void run() {
        while(true)
            pm.increment();
    }
    public String toString() {
        return "Pair: " + pm.getPair() + " checkCounter = " + pm.checkCounter.get();
    }
}

class PairChecker implements Runnable {
    private PairManager pm;
    public PairChecker(PairManager pm) {
        this.pm = pm;
    }
    public void run() {
        while(true) {
            pm.checkCounter.incrementAndGet();
            pm.getPair().checkState();
        }
    }
}

public class CriticalSection {
    // Test the two different approaches:
    static void
    testApproaches(PairManager pman1, PairManager pman2) {
        ExecutorService exec = Executors.newCachedThreadPool();
        PairManipulator pm1 = new PairManipulator(pman1), 
                pm2 = new PairManipulator(pman2);
        PairChecker pcheck1 = new PairChecker(pman1), 
                pcheck2 = new PairChecker(pman2);
        exec.execute(pm1);
        exec.execute(pm2);
        exec.execute(pcheck1);
        exec.execute(pcheck2);
        try {
            TimeUnit.MILLISECONDS.sleep(500);
        } catch(InterruptedException e) {
            System.out.println("Sleep interrupted");
        }
        System.out.println("pm1: " + pm1 + "\npm2: " + pm2);
        System.exit(0);
    }
    public static void main(String[] args) {
        PairManager pman1 = new PairManager1(), 
                pman2 = new PairManager2();
        testApproaches(pman1, pman2);
    }
}
/* Output
pm1: Pair: x: 55, y: 55  checkCounter = 3
pm2: Pair: x: 56, y: 56 checkCounter = 154288579
*/

Pair不是线程安全的,其中的自加操作不是原子的,存在被打断的情况,incrementX()incrementY()也没有同步。这里使用PairManager持有Pair并外部对Pair的访问,实现线程安全。PairManager由两个具体子类,PairManager1同步了整个increment()方法,PairManager2只同步increment()方法中的一部分。PairManagerstore()用于把Pair保存到同步列表,不需要进行同步。PairChecker在单独线程上不停地访问PairManager,用checkCounter记录成功访问的次数,由此测定PairManager各具体子类处于解锁状态的时间在整个测试期间所占的比重。从打印可见,pm2,也就是使用同步块的PairManager2,占用并锁定共享资源的时间较少,可以让共享的资源在多数时间保持解锁、供其他任务使用。

  Thinking in Java原书在这里还有一个比较同步整个方法和手动为代码块加锁的例子,但在我的电脑上运行会出现问题,详见混用同步块和同步方法时的问题

6. Synchronizing On Other Objects

  使用同步块时,必须指定要同步的对象。通常使用当前对象自身进行同步,即synchronized(this)。但有时候也需要同步于其他对象,此时必须确保其他所有相关的任务都同步于同一个对象。

class DualSynch {
private Object syncObject = new Object();
public synchronized void f() {
for(int i = 0; i < 5; i++) {
System.out.println("f()");
Thread.yield();
}
}
public void g() {
synchronized(syncObject) {
for(int i = 0; i < 5; i++) {
System.out.println("g()");
Thread.yield();
}
}
}
}
public class SyncObject {
public static void main(String[] args) {
final DualSynch ds = new DualSynch();
new Thread() {
public void run() {
ds.f();
}
}.start();
ds.g();
}
}
/* Output: (Sample)
g()
f()
g()
f()
g()
f()
...
*/
class DualSynch { private Object syncObject = new Object(); public synchronized void f() { for(int i = 0; i < 5; i++) { System.out.println("f()"); Thread.yield(); } } public void g() { synchronized(syncObject) { for(int i = 0; i < 5; i++) { System.out.println("g()"); Thread.yield(); } } } } public class SyncObject { public static void main(String[] args) { final DualSynch ds = new DualSynch(); new Thread() { public void run() { ds.f(); } }.start(); ds.g(); } } /* Output: (Sample) g() f() g() f() g() f() ... */
class DualSynch {
    private Object syncObject = new Object();
    public synchronized void f() {
        for(int i = 0; i < 5; i++) {
            System.out.println("f()");
            Thread.yield();
        }
    }
    public void g() {
        synchronized(syncObject) {
            for(int i = 0; i < 5; i++) {
                System.out.println("g()");
                Thread.yield();
            }
        }
    }
}

public class SyncObject {
    public static void main(String[] args) {
        final DualSynch ds = new DualSynch();
        new Thread() {
            public void run() {
                ds.f();
            }
        }.start();
        ds.g();
    }
}
/* Output: (Sample)
g()
f()
g()
f()
g()
f()
...
*/

这里f()同步于thisg()同步于syncObject,二者之间的同步是独立的。

7. Thread Local Storage

  使用线程本地存储(Thread Local Storage)也可以解决多任务使用共享资源发生冲突的问题。线程本地存储可以在不同的线程上为同一个变量创建副本,各线程只使用自己的副本。这使得线程具有了状态。
  java.lang.ThreadLocal用于创建线程存储:

import java.util.concurrent.*;
import java.util.*;
class Accessor implements Runnable {
private final int id;
public Accessor(int idn) { id = idn; }
public void run() {
while(!Thread.currentThread().isInterrupted()) {
ThreadLocalVariableHolder.increment();
System.out.println(this);
Thread.yield();
}
}
public String toString() {
return "#" + id + ": " +
ThreadLocalVariableHolder.get();
}
}
public class ThreadLocalVariableHolder {
private static ThreadLocal<Integer> value =
new ThreadLocal<Integer>() {
private Random rand = new Random(47);
protected synchronized Integer initialValue() {
return rand.nextInt(10000);
}
};
public static void increment() {
value.set(value.get() + 1);
}
public static int get() { return value.get(); }
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
for(int i = 0; i < 5; i++)
exec.execute(new Accessor(i));
TimeUnit.MILLISECONDS.sleep(10); // Run for a while
exec.shutdownNow(); // All Accessors will quit
}
}
/* Output
...
#2: 6776
#4: 1010
#1: 601
#0: 9300
#3: 1901
import java.util.concurrent.*; import java.util.*; class Accessor implements Runnable { private final int id; public Accessor(int idn) { id = idn; } public void run() { while(!Thread.currentThread().isInterrupted()) { ThreadLocalVariableHolder.increment(); System.out.println(this); Thread.yield(); } } public String toString() { return "#" + id + ": " + ThreadLocalVariableHolder.get(); } } public class ThreadLocalVariableHolder { private static ThreadLocal<Integer> value = new ThreadLocal<Integer>() { private Random rand = new Random(47); protected synchronized Integer initialValue() { return rand.nextInt(10000); } }; public static void increment() { value.set(value.get() + 1); } public static int get() { return value.get(); } public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); for(int i = 0; i < 5; i++) exec.execute(new Accessor(i)); TimeUnit.MILLISECONDS.sleep(10); // Run for a while exec.shutdownNow(); // All Accessors will quit } } /* Output ... #2: 6776 #4: 1010 #1: 601 #0: 9300 #3: 1901
import java.util.concurrent.*;
import java.util.*;

class Accessor implements Runnable {
    private final int id;
    public Accessor(int idn) { id = idn; }
    public void run() {
        while(!Thread.currentThread().isInterrupted()) {
            ThreadLocalVariableHolder.increment();
            System.out.println(this);
            Thread.yield();
        }
    }
    public String toString() {
        return "#" + id + ": " +
                ThreadLocalVariableHolder.get();
    }
}

public class ThreadLocalVariableHolder {
    private static ThreadLocal<Integer> value =
            new ThreadLocal<Integer>() {
                private Random rand = new Random(47);
                protected synchronized Integer initialValue() {
                    return rand.nextInt(10000);
                }
            };
    public static void increment() {
        value.set(value.get() + 1);
    }
    public static int get() { return value.get(); }
    public static void main(String[] args) throws Exception {
        ExecutorService exec = Executors.newCachedThreadPool();
        for(int i = 0; i < 5; i++)
            exec.execute(new Accessor(i));
        TimeUnit.MILLISECONDS.sleep(10);  // Run for a while
        exec.shutdownNow();         // All Accessors will quit
    }
}
/* Output
...
#2: 6776
#4: 1010
#1: 601
#0: 9300
#3: 1901

ThreadLocalVariableHolderThreadLocal<Integer> value具有随机的初始值,由打印可见,每个线程都有自己的ThreadLocal副本。ThreadLocal对象通常使用staticThreadLocal对象只能通过get()set()访问,get()会返回对应线程上的副本,set()则用于更新对应线程上的副本,并返回旧值。注意ThreadLocalVariableHolderincrement()get()没有同步,因为ThreadLocal本身可以确保不会发生竞争。