Concurrency: Active Objects
在Active Objects中,每个对象维护自己的工作线程和消息队列,所有对该对象的请求都会被队列,一个一个地执行。当向对象发送消息时,消息会转换为一个任务,加入到对象的队列中,并在之后的某个时刻运行,Java SE5中加入的Future
特别适合实现这样的编程方式。
import java.util.concurrent.*; import java.util.*; public class ActiveObjectDemo { private ExecutorService ex = Executors.newSingleThreadExecutor(); private Random rand = new Random(47); // Insert a random delay to produce the effect // of a calculation time: private void pause(int factor) { try { TimeUnit.MILLISECONDS.sleep(100 + rand.nextInt(factor)); } catch(InterruptedException e) { System.out.println("sleep() interrupted"); } } public Future<Integer> calculateInt(final int x, final int y) { return ex.submit(new Callable<Integer>() { public Integer call() { System.out.println("starting " + x + " + " + y); pause(500); return x + y; } }); } public Future<Float> calculateFloat(final float x, final float y) { return ex.submit(new Callable<Float>() { public Float call() { System.out.println("starting " + x + " + " + y); pause(2000); return x + y; } }); } public void shutdown() { ex.shutdown(); } public static void main(String[] args) { ActiveObjectDemo d1 = new ActiveObjectDemo(); // Prevents ConcurrentModificationException: List<Future<?>> results = new CopyOnWriteArrayList<Future<?>>(); for(float f = 0.0f; f < 1.0f; f += 0.2f) results.add(d1.calculateFloat(f, f)); for(int i = 0; i < 5; i++) results.add(d1.calculateInt(i, i)); System.out.println("All asynch calls made"); while(results.size() > 0) { for(Future<?> f : results) if(f.isDone()) { try { System.out.println(f.get()); } catch(Exception e) { throw new RuntimeException(e); } results.remove(f); } } d1.shutdown(); } }
上面的例子中,ActiveObjectDemo
通过ExecutorService ex = Executors.newSingleThreadExecutor()
获得用于执行任务的单个线程,calculateFloat()
和calculateInt()
提交对应的Callable
对象,把方法调用转换为了消息。List<Future<?>> results
用于保存calculateFloat()
和calculateInt()
返回的Future
对象,通过不断查询isDone()
,获取各个任务的结果,直到所有结果都被获取完毕。
对于Active Object:
- 每个对象都有自己的工作线程。
- 每个对象保留对自己的字段的完全控制。
- Active Object间的通信以消息的方式进行。
- Active Object间的消息都会被队列。
一个对象送往另一个对象的消息只会因队列中延迟而阻塞,消息的发送不会产生阻塞,消息的传递十分高效。由于通信只以消息的方式进行,两个对象不会因为争抢使用公共资源而发生阻塞,因此也不会发生死锁。由于对象的工作线程一次只运行一个任务,也不会发生资源争抢,不需要考虑同步方法。