ExecutorCompletionService

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6


当我们通过Executor提交一组并发执行的任务,并且希望在每一个任务完成后能立即得到结果,有两种方式可以采取:

 

方式一:

通过一个list来保存一组future,然后在循环中轮训这组future,直到每个future都已完成。如果我们不希望出现因为排在前面的任务阻塞导致后面先完成的任务的结果没有及时获取的情况,那么在调用get方式时,需要将超时时间设置为0 


1. public class
2.   
3. static class Task implements
4. private int
5.           
6. public Task(int
7. this.i = i;  
8.         }  
9.   
10. @Override
11. public String call() throws
12. 10000);  
13. return Thread.currentThread().getName() + "执行完任务:"
14.         }     
15.     }  
16.       
17. public static void
18.         testUseFuture();  
19.     }  
20.       
21. private static void
22. int numThread = 5;  
23.         ExecutorService executor = Executors.newFixedThreadPool(numThread);  
24. new
25. for(int i = 0;i<numThread;i++ ){  
26. new
27.             futureList.add(future);  
28.         }  
29.                   
30. while(numThread > 0){  
31. for(Future<String> future : futureList){  
32. null;  
33. try
34. 0, TimeUnit.SECONDS);  
35. catch
36.                     e.printStackTrace();  
37. catch
38.                     e.printStackTrace();  
39. catch
40. //超时异常直接忽略
41.                 }  
42. if(null
43.                     futureList.remove(future);  
44.                     numThread--;  
45.                     System.out.println(result);  
46. //此处必须break,否则会抛出并发修改异常。(也可以通过将futureList声明为CopyOnWriteArrayList类型解决)
47. break;  
48.                 }  
49.             }  
50.         }  
51.     }  
52. }

 方式二:

第一种方式显得比较繁琐,通过使用ExecutorCompletionService,则可以达到代码最简化的效果。


1. public class
2.   
3. static class Task implements
4. private int
5.           
6. public Task(int
7. this.i = i;  
8.         }  
9.   
10. @Override
11. public String call() throws
12. 10000);  
13. return Thread.currentThread().getName() + "执行完任务:"
14.         }     
15.     }  
16.       
17. public static void main(String[] args) throws
18.         testExecutorCompletionService();  
19.     }  
20.       
21. private static void testExecutorCompletionService() throws
22. int numThread = 5;  
23.         ExecutorService executor = Executors.newFixedThreadPool(numThread);  
24. new
25. for(int i = 0;i<numThread;i++ ){  
26. new
27.         }  
28. }  
29.           
30. for(int i = 0;i<numThread;i++ ){       
31.             System.out.println(completionService.take().get());  
32.         }  
33.           
34.     }


 

ExecutorCompletionService分析:

 CompletionService是Executor和BlockingQueue的结合体。


1. public
2. if (executor == null)  
3. throw new
4. this.executor = executor;  
5. this.aes = (executor instanceof
6. null;  
7. this.completionQueue = new
8.     }


 任务的提交和执行都是委托给Executor来完成。当提交某个任务时,该任务首先将被包装为一个QueueingFuture,


1. public
2. if (task == null) throw new
3.         RunnableFuture<V> f = newTaskFor(task);  
4. new
5. return
6.     }



QueueingFuture是FutureTask的一个子类,通过改写该子类的done方法,可以实现当任务完成时,将结果放入到BlockingQueue中。

 

1. private class QueueingFuture extends
2.         QueueingFuture(RunnableFuture<V> task) {  
3. super(task, null);  
4. this.task = task;  
5.         }  
6. protected void
7. private final
8.     }


 而通过使用BlockingQueue的take或poll方法,则可以得到结果。在BlockingQueue不存在元素时,这两个操作会阻塞,一旦有结果加入,则立即返回。

1. public Future<V> take() throws
2. return
3. }  
4.   
5. public
6. return
7. }
阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6