CompletionService
当向 Executor 提交批处理任务时,并且希望在它们完成后获得结果,如果用FutureTask, 你可以循环获取 task,并用 future.get() 去获取结果,但是如果这个 task 没有完成, 你就得阻塞在这里,这个实效性不高,其实在很多场合,其实你拿第一个任务结果时,此时 结果并没有生成并阻塞,其实在阻塞在第一个任务时,第二个 task 的任务已经早就完成了, 显然这种情况用 future task 不合适的,效率也不高。
自己维护 list 和 CompletionService 的区别:
- 从 list 中遍历的每个 Future 对象并不一定处于完成状态,这时调用 get() 方法就会 被阻塞住,如果系统是设计成每个线程完成后就能根据其结果继续做后面的事,这样对 于处于 list 后面的但是先完成的线程就会增加了额外的等待时间。
- 而 CompletionService 的实现是维护一个保存 Future 对象的 BlockingQueue。只有当 这个 Future 对象状态是结束的时候,才会加入到这个 Queue 中,take()方法其实就是 Producer-Consumer 中的 Consumer。它会从 Queue 中取出 Future对象,如果 Queue 是空的,就会阻塞在那里,直到有完成的 Future 对象加入到 Queue 中。
CompletionService 采取的是 BlockingQueue<Future<V>> 无界队列来管理 Future。则有 一个线程执行完毕把返回结果放到 BlockingQueue<Future<V>> 里面。就可以通过 completionServcie.take().get() 取出结果。
方法区别:
- take 方获取并移除表示下一个已完成任务的 Future,如果目前不存在这样的任务,则 等待。<如果需要用到返回值建议用take>
- poll 获取并移除表示下一个已完成任务的 Future,如果不存在这样的任务,则返回 null。
以下是jdk关于CompletionService的简介:
public interface CompletionService<V>
- 将生产新的异步任务与使用已完成任务的结果分离开来的服务。生产者submit 执行的任
务。使用者 take 已完成的任务,并按照完成这些任务的顺序处理它们的结果。例如,
CompletionService 可以用来管理异步 IO ,执行读操作的任务作为程序或系统的一部分
提交,然后,当完成读操作时,会在程序的不同部分执行其他操作,执行操作的顺序可能
与所请求的顺序不同。
- 通常,CompletionService 依赖于一个单独的 Executor 来实际执行任务,在这种情况下,
CompletionService 只管理一个内部完成队列。ExecutorCompletionService 类提供了此
方法的一个实现。
- 内存一致性效果:线程中向 CompletionService 提交任务之前的操作happen-before 该
任务执行的操作,后者依次 happen-before 紧跟在从对应take() 成功返回的操作。
废话少说,直接看代码:
package com.lucky.concurrent; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CompletionServiceDemo { public static class Task implements Callable<Integer> { private int i; Task(int i) { this.i = i; } @Override public Integer call() throws Exception { Thread.sleep(new Random().nextInt(5000)); System.out.println(Thread.currentThread().getName() + " " + i); return i; } } public void run() { ExecutorService pool = Executors.newFixedThreadPool(10); CompletionService<Integer> completionServcie = new ExecutorCompletionService<Integer>( pool); try { for (int i = 0; i < 10; i++) { completionServcie.submit(new CompletionServiceDemo.Task(i)); } for (int i = 0; i < 10; i++) { // take 方法等待下一个结果并返回 Future 对象。 // poll 不等待,有结果就返回一个 Future 对象,否则返回 null。 System.out.println(completionServcie.take().get()); } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } finally { pool.shutdown(); } } public static void main(String[] args) { new CompletionServiceDemo().run(); } }
从结果中不难看出。只要有一个线程执行完毕后,主程序就立马获取结果。