Concurrency: New Library Components (2)
Author: nex3z
2016-07-11
5. ScheduledExecutor
ScheduledThreadPoolExecutor
提供了多种执行任务的方式,schedule()
用于延时执行一次任务,scheduleAtFixedRate()
和scheduleWithFixedDelay()
用于定时重复执行,它们具有类似的签名:
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
二者都会在initialDelay
后初次执行command
,之后:
scheduleAtFixedRate()
会每在command
开始执行后的period
时间之后,再次执行command
;
scheduleWithFixedDelay()
会每在command
执行完毕后的delay
时间之后,再次执行执行command
。
一个例子如下:
import java.util.concurrent.*;
public class GreenhouseScheduler {
private volatile boolean light = false;
private volatile boolean water = false;
private String thermostat = "Day";
public synchronized String getThermostat() {
public synchronized void setThermostat(String value) {
ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(10);
public void schedule(Runnable event, long delay) {
scheduler.schedule(event,delay,TimeUnit.MILLISECONDS);
repeat(Runnable event, long initialDelay, long period) {
scheduler.scheduleAtFixedRate(event, initialDelay, period, TimeUnit.MILLISECONDS);
class LightOn implements Runnable {
// Put hardware control code here to
// physically turn on the light.
System.out.println("Turning on lights");
class LightOff implements Runnable {
// Put hardware control code here to
// physically turn off the light.
System.out.println("Turning off lights");
class WaterOn implements Runnable {
// Put hardware control code here.
System.out.println("Turning greenhouse water on");
class WaterOff implements Runnable {
// Put hardware control code here.
System.out.println("Turning greenhouse water off");
class ThermostatNight implements Runnable {
// Put hardware control code here.
System.out.println("Thermostat to night setting");
class ThermostatDay implements Runnable {
// Put hardware control code here.
System.out.println("Thermostat to day setting");
class Bell implements Runnable {
public void run() { System.out.println("Bing!"); }
class Terminate implements Runnable {
System.out.println("Terminating");
// Must start a separate task to do this job,
// since the scheduler has been shut down:
// New feature: data collection
public DataPoint(Calendar d, float temp, float hum) {
public String toString() {
String.format(" temperature: %1$.1f humidity: %2$.2f",
private Calendar lastTime = Calendar.getInstance();
{ // Adjust date to the half hour
lastTime.set(Calendar.MINUTE, 30);
lastTime.set(Calendar.SECOND, 00);
private float lastTemp = 65.0f;
private int tempDirection = +1;
private float lastHumidity = 50.0f;
private int humidityDirection = +1;
private Random rand = new Random(47);
List<DataPoint> data = Collections.synchronizedList(new ArrayList<DataPoint>());
class CollectData implements Runnable {
System.out.println("Collecting data");
synchronized(GreenhouseScheduler.this) {
// Pretend the interval is longer than it is:
lastTime.set(Calendar.MINUTE,
lastTime.get(Calendar.MINUTE) + 30);
// One in 5 chances of reversing the direction:
tempDirection = -tempDirection;
tempDirection * (1.0f + rand.nextFloat());
humidityDirection = -humidityDirection;
lastHumidity = lastHumidity +
humidityDirection * rand.nextFloat();
// Calendar must be cloned, otherwise all
// DataPoints hold references to the same lastTime.
// For a basic object like Calendar, clone() is OK.
data.add(new DataPoint((Calendar)lastTime.clone(),
lastTemp, lastHumidity));
public static void main(String[] args) {
GreenhouseScheduler gh = new GreenhouseScheduler();
gh.schedule(gh.new Terminate(), 5000);
// Former "Restart" class not necessary:
gh.repeat(gh.new Bell(), 0, 1000);
gh.repeat(gh.new ThermostatNight(), 0, 2000);
gh.repeat(gh.new LightOn(), 0, 200);
gh.repeat(gh.new LightOff(), 0, 400);
gh.repeat(gh.new WaterOn(), 0, 600);
gh.repeat(gh.new WaterOff(), 0, 800);
gh.repeat(gh.new ThermostatDay(), 0, 1400);
gh.repeat(gh.new CollectData(), 500, 500);
import java.util.concurrent.*;
import java.util.*;
public class GreenhouseScheduler {
private volatile boolean light = false;
private volatile boolean water = false;
private String thermostat = "Day";
public synchronized String getThermostat() {
return thermostat;
}
public synchronized void setThermostat(String value) {
thermostat = value;
}
ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(10);
public void schedule(Runnable event, long delay) {
scheduler.schedule(event,delay,TimeUnit.MILLISECONDS);
}
public void
repeat(Runnable event, long initialDelay, long period) {
scheduler.scheduleAtFixedRate(event, initialDelay, period, TimeUnit.MILLISECONDS);
}
class LightOn implements Runnable {
public void run() {
// Put hardware control code here to
// physically turn on the light.
System.out.println("Turning on lights");
light = true;
}
}
class LightOff implements Runnable {
public void run() {
// Put hardware control code here to
// physically turn off the light.
System.out.println("Turning off lights");
light = false;
}
}
class WaterOn implements Runnable {
public void run() {
// Put hardware control code here.
System.out.println("Turning greenhouse water on");
water = true;
}
}
class WaterOff implements Runnable {
public void run() {
// Put hardware control code here.
System.out.println("Turning greenhouse water off");
water = false;
}
}
class ThermostatNight implements Runnable {
public void run() {
// Put hardware control code here.
System.out.println("Thermostat to night setting");
setThermostat("Night");
}
}
class ThermostatDay implements Runnable {
public void run() {
// Put hardware control code here.
System.out.println("Thermostat to day setting");
setThermostat("Day");
}
}
class Bell implements Runnable {
public void run() { System.out.println("Bing!"); }
}
class Terminate implements Runnable {
public void run() {
System.out.println("Terminating");
scheduler.shutdownNow();
// Must start a separate task to do this job,
// since the scheduler has been shut down:
new Thread() {
public void run() {
for(DataPoint d : data)
System.out.println(d);
}
}.start();
}
}
// New feature: data collection
static class DataPoint {
final Calendar time;
final float temperature;
final float humidity;
public DataPoint(Calendar d, float temp, float hum) {
time = d;
temperature = temp;
humidity = hum;
}
public String toString() {
return time.getTime() +
String.format(" temperature: %1$.1f humidity: %2$.2f",
temperature, humidity);
}
}
private Calendar lastTime = Calendar.getInstance();
{ // Adjust date to the half hour
lastTime.set(Calendar.MINUTE, 30);
lastTime.set(Calendar.SECOND, 00);
}
private float lastTemp = 65.0f;
private int tempDirection = +1;
private float lastHumidity = 50.0f;
private int humidityDirection = +1;
private Random rand = new Random(47);
List<DataPoint> data = Collections.synchronizedList(new ArrayList<DataPoint>());
class CollectData implements Runnable {
public void run() {
System.out.println("Collecting data");
synchronized(GreenhouseScheduler.this) {
// Pretend the interval is longer than it is:
lastTime.set(Calendar.MINUTE,
lastTime.get(Calendar.MINUTE) + 30);
// One in 5 chances of reversing the direction:
if(rand.nextInt(5) == 4)
tempDirection = -tempDirection;
// Store previous value:
lastTemp = lastTemp +
tempDirection * (1.0f + rand.nextFloat());
if(rand.nextInt(5) == 4)
humidityDirection = -humidityDirection;
lastHumidity = lastHumidity +
humidityDirection * rand.nextFloat();
// Calendar must be cloned, otherwise all
// DataPoints hold references to the same lastTime.
// For a basic object like Calendar, clone() is OK.
data.add(new DataPoint((Calendar)lastTime.clone(),
lastTemp, lastHumidity));
}
}
}
public static void main(String[] args) {
GreenhouseScheduler gh = new GreenhouseScheduler();
gh.schedule(gh.new Terminate(), 5000);
// Former "Restart" class not necessary:
gh.repeat(gh.new Bell(), 0, 1000);
gh.repeat(gh.new ThermostatNight(), 0, 2000);
gh.repeat(gh.new LightOn(), 0, 200);
gh.repeat(gh.new LightOff(), 0, 400);
gh.repeat(gh.new WaterOn(), 0, 600);
gh.repeat(gh.new WaterOff(), 0, 800);
gh.repeat(gh.new ThermostatDay(), 0, 1400);
gh.repeat(gh.new CollectData(), 500, 500);
}
}
import java.util.concurrent.*;
import java.util.*;
public class GreenhouseScheduler {
private volatile boolean light = false;
private volatile boolean water = false;
private String thermostat = "Day";
public synchronized String getThermostat() {
return thermostat;
}
public synchronized void setThermostat(String value) {
thermostat = value;
}
ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(10);
public void schedule(Runnable event, long delay) {
scheduler.schedule(event,delay,TimeUnit.MILLISECONDS);
}
public void
repeat(Runnable event, long initialDelay, long period) {
scheduler.scheduleAtFixedRate(event, initialDelay, period, TimeUnit.MILLISECONDS);
}
class LightOn implements Runnable {
public void run() {
// Put hardware control code here to
// physically turn on the light.
System.out.println("Turning on lights");
light = true;
}
}
class LightOff implements Runnable {
public void run() {
// Put hardware control code here to
// physically turn off the light.
System.out.println("Turning off lights");
light = false;
}
}
class WaterOn implements Runnable {
public void run() {
// Put hardware control code here.
System.out.println("Turning greenhouse water on");
water = true;
}
}
class WaterOff implements Runnable {
public void run() {
// Put hardware control code here.
System.out.println("Turning greenhouse water off");
water = false;
}
}
class ThermostatNight implements Runnable {
public void run() {
// Put hardware control code here.
System.out.println("Thermostat to night setting");
setThermostat("Night");
}
}
class ThermostatDay implements Runnable {
public void run() {
// Put hardware control code here.
System.out.println("Thermostat to day setting");
setThermostat("Day");
}
}
class Bell implements Runnable {
public void run() { System.out.println("Bing!"); }
}
class Terminate implements Runnable {
public void run() {
System.out.println("Terminating");
scheduler.shutdownNow();
// Must start a separate task to do this job,
// since the scheduler has been shut down:
new Thread() {
public void run() {
for(DataPoint d : data)
System.out.println(d);
}
}.start();
}
}
// New feature: data collection
static class DataPoint {
final Calendar time;
final float temperature;
final float humidity;
public DataPoint(Calendar d, float temp, float hum) {
time = d;
temperature = temp;
humidity = hum;
}
public String toString() {
return time.getTime() +
String.format(" temperature: %1$.1f humidity: %2$.2f",
temperature, humidity);
}
}
private Calendar lastTime = Calendar.getInstance();
{ // Adjust date to the half hour
lastTime.set(Calendar.MINUTE, 30);
lastTime.set(Calendar.SECOND, 00);
}
private float lastTemp = 65.0f;
private int tempDirection = +1;
private float lastHumidity = 50.0f;
private int humidityDirection = +1;
private Random rand = new Random(47);
List<DataPoint> data = Collections.synchronizedList(new ArrayList<DataPoint>());
class CollectData implements Runnable {
public void run() {
System.out.println("Collecting data");
synchronized(GreenhouseScheduler.this) {
// Pretend the interval is longer than it is:
lastTime.set(Calendar.MINUTE,
lastTime.get(Calendar.MINUTE) + 30);
// One in 5 chances of reversing the direction:
if(rand.nextInt(5) == 4)
tempDirection = -tempDirection;
// Store previous value:
lastTemp = lastTemp +
tempDirection * (1.0f + rand.nextFloat());
if(rand.nextInt(5) == 4)
humidityDirection = -humidityDirection;
lastHumidity = lastHumidity +
humidityDirection * rand.nextFloat();
// Calendar must be cloned, otherwise all
// DataPoints hold references to the same lastTime.
// For a basic object like Calendar, clone() is OK.
data.add(new DataPoint((Calendar)lastTime.clone(),
lastTemp, lastHumidity));
}
}
}
public static void main(String[] args) {
GreenhouseScheduler gh = new GreenhouseScheduler();
gh.schedule(gh.new Terminate(), 5000);
// Former "Restart" class not necessary:
gh.repeat(gh.new Bell(), 0, 1000);
gh.repeat(gh.new ThermostatNight(), 0, 2000);
gh.repeat(gh.new LightOn(), 0, 200);
gh.repeat(gh.new LightOff(), 0, 400);
gh.repeat(gh.new WaterOn(), 0, 600);
gh.repeat(gh.new WaterOff(), 0, 800);
gh.repeat(gh.new ThermostatDay(), 0, 1400);
gh.repeat(gh.new CollectData(), 500, 500);
}
}
6. Semaphore
区别于concurrent.locks
锁和内置的synchronized
锁同一时间只允许同一个任务访问共享资源,计数信号量(Counting Semaphore)允许n个任务同时访问共享资源。
下面的例子采用了对象池(Object Pool)的概念,对象池管理有限个对象,允许对象被提取出去使用,使用完后对象需要被归还会对象池。
import java.util.concurrent.*;
private List<T> items = new ArrayList<T>();
private volatile boolean[] checkedOut;
private Semaphore available;
public Pool(Class<T> classObject, int size) {
checkedOut = new boolean[size];
available = new Semaphore(size, true);
// Load pool with objects that can be checked out:
for(int i = 0; i < size; ++i)
// Assumes a default constructor:
items.add(classObject.newInstance());
throw new RuntimeException(e);
public T checkOut() throws InterruptedException {
public void checkIn(T x) {
private synchronized T getItem() {
for(int i = 0; i < size; ++i)
return null; // Semaphore prevents reaching here
private synchronized boolean releaseItem(T item) {
int index = items.indexOf(item);
if(index == -1) return false; // Not in the list
checkedOut[index] = false;
return false; // Wasn’t checked out
import java.util.concurrent.*;
import java.util.*;
public class Pool<T> {
private int size;
private List<T> items = new ArrayList<T>();
private volatile boolean[] checkedOut;
private Semaphore available;
public Pool(Class<T> classObject, int size) {
this.size = size;
checkedOut = new boolean[size];
available = new Semaphore(size, true);
// Load pool with objects that can be checked out:
for(int i = 0; i < size; ++i)
try {
// Assumes a default constructor:
items.add(classObject.newInstance());
} catch(Exception e) {
throw new RuntimeException(e);
}
}
public T checkOut() throws InterruptedException {
available.acquire();
return getItem();
}
public void checkIn(T x) {
if(releaseItem(x))
available.release();
}
private synchronized T getItem() {
for(int i = 0; i < size; ++i)
if(!checkedOut[i]) {
checkedOut[i] = true;
return items.get(i);
}
return null; // Semaphore prevents reaching here
}
private synchronized boolean releaseItem(T item) {
int index = items.indexOf(item);
if(index == -1) return false; // Not in the list
if(checkedOut[index]) {
checkedOut[index] = false;
return true;
}
return false; // Wasn’t checked out
}
}
import java.util.concurrent.*;
import java.util.*;
public class Pool<T> {
private int size;
private List<T> items = new ArrayList<T>();
private volatile boolean[] checkedOut;
private Semaphore available;
public Pool(Class<T> classObject, int size) {
this.size = size;
checkedOut = new boolean[size];
available = new Semaphore(size, true);
// Load pool with objects that can be checked out:
for(int i = 0; i < size; ++i)
try {
// Assumes a default constructor:
items.add(classObject.newInstance());
} catch(Exception e) {
throw new RuntimeException(e);
}
}
public T checkOut() throws InterruptedException {
available.acquire();
return getItem();
}
public void checkIn(T x) {
if(releaseItem(x))
available.release();
}
private synchronized T getItem() {
for(int i = 0; i < size; ++i)
if(!checkedOut[i]) {
checkedOut[i] = true;
return items.get(i);
}
return null; // Semaphore prevents reaching here
}
private synchronized boolean releaseItem(T item) {
int index = items.indexOf(item);
if(index == -1) return false; // Not in the list
if(checkedOut[index]) {
checkedOut[index] = false;
return true;
}
return false; // Wasn’t checked out
}
}
构造器通过classObject.newInstance()
初始化固定数量个对象,通过checkOut()
和checkIn()
提取和归还对象池中的对象。checkOut()
首先通过available.acquire()
获取信号量,如果能够成功获取,意味着对象池中还有剩余的对象,则由getItem()
从对象池中取出一个对象。checkIn()
首先调用releaseItem()
,releaseItem()
会判断归还的对象是否合法,成功归还后,通过available.release()
释放信号量。
为了演示Pool<T>
的用法,首先创建一个构造起来非常耗时的类:
private volatile double d; // Prevent optimization
private static int counter = 0;
private final int id = counter++;
// Expensive, interruptible operation:
for(int i = 1; i < 10000; i++) {
d += (Math.PI + Math.E) / (double)i;
public void operation() { System.out.println(this); }
public String toString() { return "Fat id: " + id; }
public class Fat {
private volatile double d; // Prevent optimization
private static int counter = 0;
private final int id = counter++;
public Fat() {
// Expensive, interruptible operation:
for(int i = 1; i < 10000; i++) {
d += (Math.PI + Math.E) / (double)i;
}
}
public void operation() { System.out.println(this); }
public String toString() { return "Fat id: " + id; }
}
public class Fat {
private volatile double d; // Prevent optimization
private static int counter = 0;
private final int id = counter++;
public Fat() {
// Expensive, interruptible operation:
for(int i = 1; i < 10000; i++) {
d += (Math.PI + Math.E) / (double)i;
}
}
public void operation() { System.out.println(this); }
public String toString() { return "Fat id: " + id; }
}
然后使用Pool<Fat>
来存储固定数量的Fat
:
import java.util.concurrent.*;
// A task to check a resource out of a pool:
class CheckoutTask<T> implements Runnable {
private static int counter = 0;
private final int id = counter++;
public CheckoutTask(Pool<T> pool) {
T item = pool.checkOut();
System.out.println(this + "checked out " + item);
TimeUnit.SECONDS.sleep(1);
System.out.println(this +"checking in " + item);
} catch(InterruptedException e) {
// Acceptable way to terminate
public String toString() {
return "CheckoutTask " + id + " ";
public class SemaphoreDemo {
final static int SIZE = 25;
public static void main(String[] args) throws Exception {
final Pool<Fat> pool = new Pool<Fat>(Fat.class, SIZE);
ExecutorService exec = Executors.newCachedThreadPool();
for(int i = 0; i < SIZE; i++)
exec.execute(new CheckoutTask<Fat>(pool));
System.out.println("All CheckoutTasks created");
List<Fat> list = new ArrayList<Fat>();
for(int i = 0; i < SIZE; i++) {
System.out.print(i + ": main() thread checked out ");
Future<?> blocked = exec.submit(new Runnable() {
// Semaphore prevents additional checkout,
} catch(InterruptedException e) {
System.out.println("checkOut() Interrupted");
TimeUnit.SECONDS.sleep(2);
blocked.cancel(true); // Break out of blocked call
System.out.println("Checking in objects in " + list);
pool.checkIn(f); // Second checkIn ignored
import java.util.concurrent.*;
import java.util.*;
// A task to check a resource out of a pool:
class CheckoutTask<T> implements Runnable {
private static int counter = 0;
private final int id = counter++;
private Pool<T> pool;
public CheckoutTask(Pool<T> pool) {
this.pool = pool;
}
public void run() {
try {
T item = pool.checkOut();
System.out.println(this + "checked out " + item);
TimeUnit.SECONDS.sleep(1);
System.out.println(this +"checking in " + item);
pool.checkIn(item);
} catch(InterruptedException e) {
// Acceptable way to terminate
}
}
public String toString() {
return "CheckoutTask " + id + " ";
}
}
public class SemaphoreDemo {
final static int SIZE = 25;
public static void main(String[] args) throws Exception {
final Pool<Fat> pool = new Pool<Fat>(Fat.class, SIZE);
ExecutorService exec = Executors.newCachedThreadPool();
for(int i = 0; i < SIZE; i++)
exec.execute(new CheckoutTask<Fat>(pool));
System.out.println("All CheckoutTasks created");
List<Fat> list = new ArrayList<Fat>();
for(int i = 0; i < SIZE; i++) {
Fat f = pool.checkOut();
System.out.print(i + ": main() thread checked out ");
f.operation();
list.add(f);
}
Future<?> blocked = exec.submit(new Runnable() {
public void run() {
try {
// Semaphore prevents additional checkout,
// so call is blocked:
pool.checkOut();
} catch(InterruptedException e) {
System.out.println("checkOut() Interrupted");
}
}
});
TimeUnit.SECONDS.sleep(2);
blocked.cancel(true); // Break out of blocked call
System.out.println("Checking in objects in " + list);
for(Fat f : list)
pool.checkIn(f);
for(Fat f : list)
pool.checkIn(f); // Second checkIn ignored
exec.shutdown();
}
}
import java.util.concurrent.*;
import java.util.*;
// A task to check a resource out of a pool:
class CheckoutTask<T> implements Runnable {
private static int counter = 0;
private final int id = counter++;
private Pool<T> pool;
public CheckoutTask(Pool<T> pool) {
this.pool = pool;
}
public void run() {
try {
T item = pool.checkOut();
System.out.println(this + "checked out " + item);
TimeUnit.SECONDS.sleep(1);
System.out.println(this +"checking in " + item);
pool.checkIn(item);
} catch(InterruptedException e) {
// Acceptable way to terminate
}
}
public String toString() {
return "CheckoutTask " + id + " ";
}
}
public class SemaphoreDemo {
final static int SIZE = 25;
public static void main(String[] args) throws Exception {
final Pool<Fat> pool = new Pool<Fat>(Fat.class, SIZE);
ExecutorService exec = Executors.newCachedThreadPool();
for(int i = 0; i < SIZE; i++)
exec.execute(new CheckoutTask<Fat>(pool));
System.out.println("All CheckoutTasks created");
List<Fat> list = new ArrayList<Fat>();
for(int i = 0; i < SIZE; i++) {
Fat f = pool.checkOut();
System.out.print(i + ": main() thread checked out ");
f.operation();
list.add(f);
}
Future<?> blocked = exec.submit(new Runnable() {
public void run() {
try {
// Semaphore prevents additional checkout,
// so call is blocked:
pool.checkOut();
} catch(InterruptedException e) {
System.out.println("checkOut() Interrupted");
}
}
});
TimeUnit.SECONDS.sleep(2);
blocked.cancel(true); // Break out of blocked call
System.out.println("Checking in objects in " + list);
for(Fat f : list)
pool.checkIn(f);
for(Fat f : list)
pool.checkIn(f); // Second checkIn ignored
exec.shutdown();
}
}
Fat
对象池中有SIZE
个对象,首先由SIZE
个CheckoutTask
从对象池中checkOut()
并checkIn()
对象。之后在main()
中checkOut()
全部对象,当在blocked
中尝试checkOut()
时,对象池已空,会发生阻塞。最后main()
再checkIn()
全部对象,重复checkIn()
没有作用。
7. Exchanger
Exchanger
用于在两个任务间交换对象,通常用于在一个任务中创建对象,并把它交换到另一个任务中进行消耗。
import java.util.concurrent.*;
interface Generator<T> { T next(); }
class BasicGenerator<T> implements Generator<T> {
public BasicGenerator(Class<T> type){ this.type = type; }
// Assumes type is a public class:
return type.newInstance();
throw new RuntimeException(e);
// Produce a Default generator given a type token:
public static <T> Generator<T> create(Class<T> type) {
return new BasicGenerator<T>(type);
class ExchangerProducer<T> implements Runnable {
private Generator<T> generator;
private Exchanger<List<T>> exchanger;
ExchangerProducer(Exchanger<List<T>> exchg,
Generator<T> gen, List<T> holder) {
while(!Thread.interrupted()) {
for(int i = 0; i < ExchangerDemo.size; i++)
holder.add(generator.next());
// Exchange full for empty:
holder = exchanger.exchange(holder);
} catch(InterruptedException e) {
// OK to terminate this way.
class ExchangerConsumer<T> implements Runnable {
private Exchanger<List<T>> exchanger;
private volatile T value;
ExchangerConsumer(Exchanger<List<T>> ex, List<T> holder){
while(!Thread.interrupted()) {
holder = exchanger.exchange(holder);
value = x; // Fetch out value
holder.remove(x); // OK for CopyOnWriteArrayList
} catch(InterruptedException e) {
// OK to terminate this way.
System.out.println("Final value: " + value);
public class ExchangerDemo {
static int delay = 5; // Seconds
public static void main(String[] args) throws Exception {
size = new Integer(args[0]);
delay = new Integer(args[1]);
ExecutorService exec = Executors.newCachedThreadPool();
Exchanger<List<Fat>> xc = new Exchanger<List<Fat>>();
List<Fat> producerList = new CopyOnWriteArrayList<Fat>(),
consumerList = new CopyOnWriteArrayList<Fat>();
exec.execute(new ExchangerProducer<Fat>(xc,
BasicGenerator.create(Fat.class), producerList));
exec.execute(new ExchangerConsumer<Fat>(xc, consumerList));
TimeUnit.SECONDS.sleep(delay);
Final value: Fat id: 71899
import java.util.concurrent.*;
import java.util.*;
interface Generator<T> { T next(); }
class BasicGenerator<T> implements Generator<T> {
private Class<T> type;
public BasicGenerator(Class<T> type){ this.type = type; }
public T next() {
try {
// Assumes type is a public class:
return type.newInstance();
} catch(Exception e) {
throw new RuntimeException(e);
}
}
// Produce a Default generator given a type token:
public static <T> Generator<T> create(Class<T> type) {
return new BasicGenerator<T>(type);
}
}
class ExchangerProducer<T> implements Runnable {
private Generator<T> generator;
private Exchanger<List<T>> exchanger;
private List<T> holder;
ExchangerProducer(Exchanger<List<T>> exchg,
Generator<T> gen, List<T> holder) {
exchanger = exchg;
generator = gen;
this.holder = holder;
}
public void run() {
try {
while(!Thread.interrupted()) {
for(int i = 0; i < ExchangerDemo.size; i++)
holder.add(generator.next());
// Exchange full for empty:
holder = exchanger.exchange(holder);
}
} catch(InterruptedException e) {
// OK to terminate this way.
}
}
}
class ExchangerConsumer<T> implements Runnable {
private Exchanger<List<T>> exchanger;
private List<T> holder;
private volatile T value;
ExchangerConsumer(Exchanger<List<T>> ex, List<T> holder){
exchanger = ex;
this.holder = holder;
}
public void run() {
try {
while(!Thread.interrupted()) {
holder = exchanger.exchange(holder);
for(T x : holder) {
value = x; // Fetch out value
holder.remove(x); // OK for CopyOnWriteArrayList
}
}
} catch(InterruptedException e) {
// OK to terminate this way.
}
System.out.println("Final value: " + value);
}
}
public class ExchangerDemo {
static int size = 10;
static int delay = 5; // Seconds
public static void main(String[] args) throws Exception {
if(args.length > 0)
size = new Integer(args[0]);
if(args.length > 1)
delay = new Integer(args[1]);
ExecutorService exec = Executors.newCachedThreadPool();
Exchanger<List<Fat>> xc = new Exchanger<List<Fat>>();
List<Fat> producerList = new CopyOnWriteArrayList<Fat>(),
consumerList = new CopyOnWriteArrayList<Fat>();
exec.execute(new ExchangerProducer<Fat>(xc,
BasicGenerator.create(Fat.class), producerList));
exec.execute(new ExchangerConsumer<Fat>(xc, consumerList));
TimeUnit.SECONDS.sleep(delay);
exec.shutdownNow();
}
}
/* Output
Final value: Fat id: 71899
*/
import java.util.concurrent.*;
import java.util.*;
interface Generator<T> { T next(); }
class BasicGenerator<T> implements Generator<T> {
private Class<T> type;
public BasicGenerator(Class<T> type){ this.type = type; }
public T next() {
try {
// Assumes type is a public class:
return type.newInstance();
} catch(Exception e) {
throw new RuntimeException(e);
}
}
// Produce a Default generator given a type token:
public static <T> Generator<T> create(Class<T> type) {
return new BasicGenerator<T>(type);
}
}
class ExchangerProducer<T> implements Runnable {
private Generator<T> generator;
private Exchanger<List<T>> exchanger;
private List<T> holder;
ExchangerProducer(Exchanger<List<T>> exchg,
Generator<T> gen, List<T> holder) {
exchanger = exchg;
generator = gen;
this.holder = holder;
}
public void run() {
try {
while(!Thread.interrupted()) {
for(int i = 0; i < ExchangerDemo.size; i++)
holder.add(generator.next());
// Exchange full for empty:
holder = exchanger.exchange(holder);
}
} catch(InterruptedException e) {
// OK to terminate this way.
}
}
}
class ExchangerConsumer<T> implements Runnable {
private Exchanger<List<T>> exchanger;
private List<T> holder;
private volatile T value;
ExchangerConsumer(Exchanger<List<T>> ex, List<T> holder){
exchanger = ex;
this.holder = holder;
}
public void run() {
try {
while(!Thread.interrupted()) {
holder = exchanger.exchange(holder);
for(T x : holder) {
value = x; // Fetch out value
holder.remove(x); // OK for CopyOnWriteArrayList
}
}
} catch(InterruptedException e) {
// OK to terminate this way.
}
System.out.println("Final value: " + value);
}
}
public class ExchangerDemo {
static int size = 10;
static int delay = 5; // Seconds
public static void main(String[] args) throws Exception {
if(args.length > 0)
size = new Integer(args[0]);
if(args.length > 1)
delay = new Integer(args[1]);
ExecutorService exec = Executors.newCachedThreadPool();
Exchanger<List<Fat>> xc = new Exchanger<List<Fat>>();
List<Fat> producerList = new CopyOnWriteArrayList<Fat>(),
consumerList = new CopyOnWriteArrayList<Fat>();
exec.execute(new ExchangerProducer<Fat>(xc,
BasicGenerator.create(Fat.class), producerList));
exec.execute(new ExchangerConsumer<Fat>(xc, consumerList));
TimeUnit.SECONDS.sleep(delay);
exec.shutdownNow();
}
}
/* Output
Final value: Fat id: 71899
*/
这里交换的对象是List<Fat>
,ExchangerProducer
生成List<Fat>
,ExchangerConsumer
消耗List<Fat>
,二者通过同一个Exchanger<List<Fat>> xc
联系起来。ExchangerConsumer
在调用exchanger.exchange()
时,会产生阻塞;直到ExchangerProducer
填充完List<Fat>
,并调用exchanger.exchange()
后,ExchangerProducer
和ExchangerConsumer
持有的List<Fat>
(holder
)被交换,ExchangerConsumer
得到了填充后的列表,ExchangerProducer
得到了空的列表。Exchanger
使得资源的生产和消耗得以同时进行。