Concurrency: Active Objects
在Active Objects中,每个对象维护自己的工作线程和消息队列,所有对该对象的请求都会被队列,一个一个地执行。当向对象发送消息时,消息会转换为一个任务,加入到对象的队列中,并在之后的某个时刻运行,Java SE5中加入的Future特别适合实现这样的编程方式。
import java.util.concurrent.*;
import java.util.*;
public class ActiveObjectDemo {
private ExecutorService ex = Executors.newSingleThreadExecutor();
private Random rand = new Random(47);
// Insert a random delay to produce the effect
// of a calculation time:
private void pause(int factor) {
try {
TimeUnit.MILLISECONDS.sleep(100 + rand.nextInt(factor));
} catch(InterruptedException e) {
System.out.println("sleep() interrupted");
}
}
public Future<Integer> calculateInt(final int x, final int y) {
return ex.submit(new Callable<Integer>() {
public Integer call() {
System.out.println("starting " + x + " + " + y);
pause(500);
return x + y;
}
});
}
public Future<Float> calculateFloat(final float x, final float y) {
return ex.submit(new Callable<Float>() {
public Float call() {
System.out.println("starting " + x + " + " + y);
pause(2000);
return x + y;
}
});
}
public void shutdown() { ex.shutdown(); }
public static void main(String[] args) {
ActiveObjectDemo d1 = new ActiveObjectDemo();
// Prevents ConcurrentModificationException:
List<Future<?>> results = new CopyOnWriteArrayList<Future<?>>();
for(float f = 0.0f; f < 1.0f; f += 0.2f)
results.add(d1.calculateFloat(f, f));
for(int i = 0; i < 5; i++)
results.add(d1.calculateInt(i, i));
System.out.println("All asynch calls made");
while(results.size() > 0) {
for(Future<?> f : results)
if(f.isDone()) {
try {
System.out.println(f.get());
} catch(Exception e) {
throw new RuntimeException(e);
}
results.remove(f);
}
}
d1.shutdown();
}
}
上面的例子中,ActiveObjectDemo通过ExecutorService ex = Executors.newSingleThreadExecutor()获得用于执行任务的单个线程,calculateFloat()和calculateInt()提交对应的Callable对象,把方法调用转换为了消息。List<Future<?>> results用于保存calculateFloat()和calculateInt()返回的Future对象,通过不断查询isDone(),获取各个任务的结果,直到所有结果都被获取完毕。
对于Active Object:
- 每个对象都有自己的工作线程。
- 每个对象保留对自己的字段的完全控制。
- Active Object间的通信以消息的方式进行。
- Active Object间的消息都会被队列。
一个对象送往另一个对象的消息只会因队列中延迟而阻塞,消息的发送不会产生阻塞,消息的传递十分高效。由于通信只以消息的方式进行,两个对象不会因为争抢使用公共资源而发生阻塞,因此也不会发生死锁。由于对象的工作线程一次只运行一个任务,也不会发生资源争抢,不需要考虑同步方法。