Concurrency: Active Objects

  在Active Objects中,每个对象维护自己的工作线程和消息队列,所有对该对象的请求都会被队列,一个一个地执行。当向对象发送消息时,消息会转换为一个任务,加入到对象的队列中,并在之后的某个时刻运行,Java SE5中加入的Future特别适合实现这样的编程方式。

import java.util.concurrent.*;
import java.util.*;

public class ActiveObjectDemo {
    private ExecutorService ex = Executors.newSingleThreadExecutor();
    private Random rand = new Random(47);
    // Insert a random delay to produce the effect
    // of a calculation time:
    private void pause(int factor) {
        try {
            TimeUnit.MILLISECONDS.sleep(100 + rand.nextInt(factor));
        } catch(InterruptedException e) {
            System.out.println("sleep() interrupted");
        }
    }
    public Future<Integer> calculateInt(final int x, final int y) {
        return ex.submit(new Callable<Integer>() {
            public Integer call() {
                System.out.println("starting " + x + " + " + y);
                pause(500);
                return x + y;
            }
        });
    }
    public Future<Float> calculateFloat(final float x, final float y) {
        return ex.submit(new Callable<Float>() {
            public Float call() {
                System.out.println("starting " + x + " + " + y);
                pause(2000);
                return x + y;
            }
        });
    }
    public void shutdown() { ex.shutdown(); }
    public static void main(String[] args) {
        ActiveObjectDemo d1 = new ActiveObjectDemo();
        // Prevents ConcurrentModificationException:
        List<Future<?>> results = new CopyOnWriteArrayList<Future<?>>();
        for(float f = 0.0f; f < 1.0f; f += 0.2f)
            results.add(d1.calculateFloat(f, f));
        for(int i = 0; i < 5; i++)
            results.add(d1.calculateInt(i, i));
        System.out.println("All asynch calls made");
        while(results.size() > 0) {
            for(Future<?> f : results)
                if(f.isDone()) {
                    try {
                        System.out.println(f.get());
                    } catch(Exception e) {
                        throw new RuntimeException(e);
                    }
                    results.remove(f);
                }
        }
        d1.shutdown();
    }
}

上面的例子中,ActiveObjectDemo通过ExecutorService ex = Executors.newSingleThreadExecutor()获得用于执行任务的单个线程,calculateFloat()calculateInt()提交对应的Callable对象,把方法调用转换为了消息。List<Future<?>> results用于保存calculateFloat()calculateInt()返回的Future对象,通过不断查询isDone(),获取各个任务的结果,直到所有结果都被获取完毕。

  对于Active Object:

  1. 每个对象都有自己的工作线程。
  2. 每个对象保留对自己的字段的完全控制。
  3. Active Object间的通信以消息的方式进行。
  4. Active Object间的消息都会被队列。

一个对象送往另一个对象的消息只会因队列中延迟而阻塞,消息的发送不会产生阻塞,消息的传递十分高效。由于通信只以消息的方式进行,两个对象不会因为争抢使用公共资源而发生阻塞,因此也不会发生死锁。由于对象的工作线程一次只运行一个任务,也不会发生资源争抢,不需要考虑同步方法。