Java CompletionService
java
2016-06-24

CompletionService

当向 Executor 提交批处理任务时,并且希望在它们完成后获得结果,如果用FutureTask, 你可以循环获取 task,并用 future.get() 去获取结果,但是如果这个 task 没有完成, 你就得阻塞在这里,这个实效性不高,其实在很多场合,其实你拿第一个任务结果时,此时 结果并没有生成并阻塞,其实在阻塞在第一个任务时,第二个 task 的任务已经早就完成了, 显然这种情况用 future task 不合适的,效率也不高。

自己维护 list 和 CompletionService 的区别:

  1. 从 list 中遍历的每个 Future 对象并不一定处于完成状态,这时调用 get() 方法就会 被阻塞住,如果系统是设计成每个线程完成后就能根据其结果继续做后面的事,这样对 于处于 list 后面的但是先完成的线程就会增加了额外的等待时间。
  2. 而 CompletionService 的实现是维护一个保存 Future 对象的 BlockingQueue。只有当 这个 Future 对象状态是结束的时候,才会加入到这个 Queue 中,take()方法其实就是 Producer-Consumer 中的 Consumer。它会从 Queue 中取出 Future对象,如果 Queue 是空的,就会阻塞在那里,直到有完成的 Future 对象加入到 Queue 中。

CompletionService 采取的是 BlockingQueue<Future<V>> 无界队列来管理 Future。则有 一个线程执行完毕把返回结果放到 BlockingQueue<Future<V>> 里面。就可以通过 completionServcie.take().get() 取出结果。

方法区别:

  1. take 方获取并移除表示下一个已完成任务的 Future,如果目前不存在这样的任务,则 等待。<如果需要用到返回值建议用take>
  2. poll 获取并移除表示下一个已完成任务的 Future,如果不存在这样的任务,则返回 null。

以下是jdk关于CompletionService的简介:

public interface CompletionService<V>
- 将生产新的异步任务与使用已完成任务的结果分离开来的服务。生产者submit 执行的任
  务。使用者 take 已完成的任务,并按照完成这些任务的顺序处理它们的结果。例如,
  CompletionService 可以用来管理异步 IO ,执行读操作的任务作为程序或系统的一部分
  提交,然后,当完成读操作时,会在程序的不同部分执行其他操作,执行操作的顺序可能
  与所请求的顺序不同。
- 通常,CompletionService 依赖于一个单独的 Executor 来实际执行任务,在这种情况下,
  CompletionService 只管理一个内部完成队列。ExecutorCompletionService 类提供了此
  方法的一个实现。
- 内存一致性效果:线程中向 CompletionService 提交任务之前的操作happen-before 该
  任务执行的操作,后者依次 happen-before 紧跟在从对应take() 成功返回的操作。

废话少说,直接看代码:

package com.lucky.concurrent;

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletionServiceDemo {

    public static class Task implements Callable<Integer> {
        private int i;

        Task(int i) {
            this.i = i;
        }

        @Override
        public Integer call() throws Exception {
            Thread.sleep(new Random().nextInt(5000));
            System.out.println(Thread.currentThread().getName() + "   " + i);
            return i;
        }
    }

    public void run() {
        ExecutorService pool = Executors.newFixedThreadPool(10);
        CompletionService<Integer> completionServcie = new ExecutorCompletionService<Integer>(
            pool);
        try {
            for (int i = 0; i < 10; i++) {
                completionServcie.submit(new CompletionServiceDemo.Task(i));
            }
            for (int i = 0; i < 10; i++) {
                // take 方法等待下一个结果并返回 Future 对象。
                // poll 不等待,有结果就返回一个 Future 对象,否则返回 null。
                System.out.println(completionServcie.take().get());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } finally {
            pool.shutdown();
        }
    }

    public static void main(String[] args) {
        new CompletionServiceDemo().run();
    }
}

从结果中不难看出。只要有一个线程执行完毕后,主程序就立马获取结果。

其它文章