Concurrency: New Library Components (2)
5. ScheduledExecutor
ScheduledThreadPoolExecutor
提供了多种执行任务的方式,schedule()
用于延时执行一次任务,scheduleAtFixedRate()
和scheduleWithFixedDelay()
用于定时重复执行,它们具有类似的签名:
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
二者都会在initialDelay
后初次执行command
,之后:
scheduleAtFixedRate()
会每在command
开始执行后的period
时间之后,再次执行command
;scheduleWithFixedDelay()
会每在command
执行完毕后的delay
时间之后,再次执行执行command
。
一个例子如下:
import java.util.concurrent.*; import java.util.*; public class GreenhouseScheduler { private volatile boolean light = false; private volatile boolean water = false; private String thermostat = "Day"; public synchronized String getThermostat() { return thermostat; } public synchronized void setThermostat(String value) { thermostat = value; } ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(10); public void schedule(Runnable event, long delay) { scheduler.schedule(event,delay,TimeUnit.MILLISECONDS); } public void repeat(Runnable event, long initialDelay, long period) { scheduler.scheduleAtFixedRate(event, initialDelay, period, TimeUnit.MILLISECONDS); } class LightOn implements Runnable { public void run() { // Put hardware control code here to // physically turn on the light. System.out.println("Turning on lights"); light = true; } } class LightOff implements Runnable { public void run() { // Put hardware control code here to // physically turn off the light. System.out.println("Turning off lights"); light = false; } } class WaterOn implements Runnable { public void run() { // Put hardware control code here. System.out.println("Turning greenhouse water on"); water = true; } } class WaterOff implements Runnable { public void run() { // Put hardware control code here. System.out.println("Turning greenhouse water off"); water = false; } } class ThermostatNight implements Runnable { public void run() { // Put hardware control code here. System.out.println("Thermostat to night setting"); setThermostat("Night"); } } class ThermostatDay implements Runnable { public void run() { // Put hardware control code here. System.out.println("Thermostat to day setting"); setThermostat("Day"); } } class Bell implements Runnable { public void run() { System.out.println("Bing!"); } } class Terminate implements Runnable { public void run() { System.out.println("Terminating"); scheduler.shutdownNow(); // Must start a separate task to do this job, // since the scheduler has been shut down: new Thread() { public void run() { for(DataPoint d : data) System.out.println(d); } }.start(); } } // New feature: data collection static class DataPoint { final Calendar time; final float temperature; final float humidity; public DataPoint(Calendar d, float temp, float hum) { time = d; temperature = temp; humidity = hum; } public String toString() { return time.getTime() + String.format(" temperature: %1$.1f humidity: %2$.2f", temperature, humidity); } } private Calendar lastTime = Calendar.getInstance(); { // Adjust date to the half hour lastTime.set(Calendar.MINUTE, 30); lastTime.set(Calendar.SECOND, 00); } private float lastTemp = 65.0f; private int tempDirection = +1; private float lastHumidity = 50.0f; private int humidityDirection = +1; private Random rand = new Random(47); List<DataPoint> data = Collections.synchronizedList(new ArrayList<DataPoint>()); class CollectData implements Runnable { public void run() { System.out.println("Collecting data"); synchronized(GreenhouseScheduler.this) { // Pretend the interval is longer than it is: lastTime.set(Calendar.MINUTE, lastTime.get(Calendar.MINUTE) + 30); // One in 5 chances of reversing the direction: if(rand.nextInt(5) == 4) tempDirection = -tempDirection; // Store previous value: lastTemp = lastTemp + tempDirection * (1.0f + rand.nextFloat()); if(rand.nextInt(5) == 4) humidityDirection = -humidityDirection; lastHumidity = lastHumidity + humidityDirection * rand.nextFloat(); // Calendar must be cloned, otherwise all // DataPoints hold references to the same lastTime. // For a basic object like Calendar, clone() is OK. data.add(new DataPoint((Calendar)lastTime.clone(), lastTemp, lastHumidity)); } } } public static void main(String[] args) { GreenhouseScheduler gh = new GreenhouseScheduler(); gh.schedule(gh.new Terminate(), 5000); // Former "Restart" class not necessary: gh.repeat(gh.new Bell(), 0, 1000); gh.repeat(gh.new ThermostatNight(), 0, 2000); gh.repeat(gh.new LightOn(), 0, 200); gh.repeat(gh.new LightOff(), 0, 400); gh.repeat(gh.new WaterOn(), 0, 600); gh.repeat(gh.new WaterOff(), 0, 800); gh.repeat(gh.new ThermostatDay(), 0, 1400); gh.repeat(gh.new CollectData(), 500, 500); } }
6. Semaphore
区别于concurrent.locks
锁和内置的synchronized
锁同一时间只允许同一个任务访问共享资源,计数信号量(Counting Semaphore)允许n个任务同时访问共享资源。
下面的例子采用了对象池(Object Pool)的概念,对象池管理有限个对象,允许对象被提取出去使用,使用完后对象需要被归还会对象池。
import java.util.concurrent.*; import java.util.*; public class Pool<T> { private int size; private List<T> items = new ArrayList<T>(); private volatile boolean[] checkedOut; private Semaphore available; public Pool(Class<T> classObject, int size) { this.size = size; checkedOut = new boolean[size]; available = new Semaphore(size, true); // Load pool with objects that can be checked out: for(int i = 0; i < size; ++i) try { // Assumes a default constructor: items.add(classObject.newInstance()); } catch(Exception e) { throw new RuntimeException(e); } } public T checkOut() throws InterruptedException { available.acquire(); return getItem(); } public void checkIn(T x) { if(releaseItem(x)) available.release(); } private synchronized T getItem() { for(int i = 0; i < size; ++i) if(!checkedOut[i]) { checkedOut[i] = true; return items.get(i); } return null; // Semaphore prevents reaching here } private synchronized boolean releaseItem(T item) { int index = items.indexOf(item); if(index == -1) return false; // Not in the list if(checkedOut[index]) { checkedOut[index] = false; return true; } return false; // Wasn’t checked out } }
构造器通过classObject.newInstance()
初始化固定数量个对象,通过checkOut()
和checkIn()
提取和归还对象池中的对象。checkOut()
首先通过available.acquire()
获取信号量,如果能够成功获取,意味着对象池中还有剩余的对象,则由getItem()
从对象池中取出一个对象。checkIn()
首先调用releaseItem()
,releaseItem()
会判断归还的对象是否合法,成功归还后,通过available.release()
释放信号量。
为了演示Pool<T>
的用法,首先创建一个构造起来非常耗时的类:
public class Fat { private volatile double d; // Prevent optimization private static int counter = 0; private final int id = counter++; public Fat() { // Expensive, interruptible operation: for(int i = 1; i < 10000; i++) { d += (Math.PI + Math.E) / (double)i; } } public void operation() { System.out.println(this); } public String toString() { return "Fat id: " + id; } }
然后使用Pool<Fat>
来存储固定数量的Fat
:
import java.util.concurrent.*; import java.util.*; // A task to check a resource out of a pool: class CheckoutTask<T> implements Runnable { private static int counter = 0; private final int id = counter++; private Pool<T> pool; public CheckoutTask(Pool<T> pool) { this.pool = pool; } public void run() { try { T item = pool.checkOut(); System.out.println(this + "checked out " + item); TimeUnit.SECONDS.sleep(1); System.out.println(this +"checking in " + item); pool.checkIn(item); } catch(InterruptedException e) { // Acceptable way to terminate } } public String toString() { return "CheckoutTask " + id + " "; } } public class SemaphoreDemo { final static int SIZE = 25; public static void main(String[] args) throws Exception { final Pool<Fat> pool = new Pool<Fat>(Fat.class, SIZE); ExecutorService exec = Executors.newCachedThreadPool(); for(int i = 0; i < SIZE; i++) exec.execute(new CheckoutTask<Fat>(pool)); System.out.println("All CheckoutTasks created"); List<Fat> list = new ArrayList<Fat>(); for(int i = 0; i < SIZE; i++) { Fat f = pool.checkOut(); System.out.print(i + ": main() thread checked out "); f.operation(); list.add(f); } Future<?> blocked = exec.submit(new Runnable() { public void run() { try { // Semaphore prevents additional checkout, // so call is blocked: pool.checkOut(); } catch(InterruptedException e) { System.out.println("checkOut() Interrupted"); } } }); TimeUnit.SECONDS.sleep(2); blocked.cancel(true); // Break out of blocked call System.out.println("Checking in objects in " + list); for(Fat f : list) pool.checkIn(f); for(Fat f : list) pool.checkIn(f); // Second checkIn ignored exec.shutdown(); } }
Fat
对象池中有SIZE
个对象,首先由SIZE
个CheckoutTask
从对象池中checkOut()
并checkIn()
对象。之后在main()
中checkOut()
全部对象,当在blocked
中尝试checkOut()
时,对象池已空,会发生阻塞。最后main()
再checkIn()
全部对象,重复checkIn()
没有作用。
7. Exchanger
Exchanger
用于在两个任务间交换对象,通常用于在一个任务中创建对象,并把它交换到另一个任务中进行消耗。
import java.util.concurrent.*; import java.util.*; interface Generator<T> { T next(); } class BasicGenerator<T> implements Generator<T> { private Class<T> type; public BasicGenerator(Class<T> type){ this.type = type; } public T next() { try { // Assumes type is a public class: return type.newInstance(); } catch(Exception e) { throw new RuntimeException(e); } } // Produce a Default generator given a type token: public static <T> Generator<T> create(Class<T> type) { return new BasicGenerator<T>(type); } } class ExchangerProducer<T> implements Runnable { private Generator<T> generator; private Exchanger<List<T>> exchanger; private List<T> holder; ExchangerProducer(Exchanger<List<T>> exchg, Generator<T> gen, List<T> holder) { exchanger = exchg; generator = gen; this.holder = holder; } public void run() { try { while(!Thread.interrupted()) { for(int i = 0; i < ExchangerDemo.size; i++) holder.add(generator.next()); // Exchange full for empty: holder = exchanger.exchange(holder); } } catch(InterruptedException e) { // OK to terminate this way. } } } class ExchangerConsumer<T> implements Runnable { private Exchanger<List<T>> exchanger; private List<T> holder; private volatile T value; ExchangerConsumer(Exchanger<List<T>> ex, List<T> holder){ exchanger = ex; this.holder = holder; } public void run() { try { while(!Thread.interrupted()) { holder = exchanger.exchange(holder); for(T x : holder) { value = x; // Fetch out value holder.remove(x); // OK for CopyOnWriteArrayList } } } catch(InterruptedException e) { // OK to terminate this way. } System.out.println("Final value: " + value); } } public class ExchangerDemo { static int size = 10; static int delay = 5; // Seconds public static void main(String[] args) throws Exception { if(args.length > 0) size = new Integer(args[0]); if(args.length > 1) delay = new Integer(args[1]); ExecutorService exec = Executors.newCachedThreadPool(); Exchanger<List<Fat>> xc = new Exchanger<List<Fat>>(); List<Fat> producerList = new CopyOnWriteArrayList<Fat>(), consumerList = new CopyOnWriteArrayList<Fat>(); exec.execute(new ExchangerProducer<Fat>(xc, BasicGenerator.create(Fat.class), producerList)); exec.execute(new ExchangerConsumer<Fat>(xc, consumerList)); TimeUnit.SECONDS.sleep(delay); exec.shutdownNow(); } } /* Output Final value: Fat id: 71899 */
这里交换的对象是List<Fat>
,ExchangerProducer
生成List<Fat>
,ExchangerConsumer
消耗List<Fat>
,二者通过同一个Exchanger<List<Fat>> xc
联系起来。ExchangerConsumer
在调用exchanger.exchange()
时,会产生阻塞;直到ExchangerProducer
填充完List<Fat>
,并调用exchanger.exchange()
后,ExchangerProducer
和ExchangerConsumer
持有的List<Fat>
(holder
)被交换,ExchangerConsumer
得到了填充后的列表,ExchangerProducer
得到了空的列表。Exchanger
使得资源的生产和消耗得以同时进行。