怎么使用CompletableFuture
这篇文章主要讲解了“怎么使用CompletableFuture”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“怎么使用CompletableFuture”吧!
10年积累的成都做网站、成都网站制作、成都外贸网站建设经验,可以快速应对客户对网站的新想法和需求。提供各种问题对应的解决方案。让选择我们的客户得到更好、更有力的网络服务。我虽然不认识你,你也不认识我。但先网站设计后付款的网站建设流程,更有安宁免费网站建设让你可以放心的选择与我们合作。
但Future机制,还不那么灵活,比如怎么去利用Future机制描述两个任务串行执行,又或是两个任务并行执行,又或是只关心最先执行结束的任务结果。
Future机制在一定程度上都无法快速地满足以上需求,CompletableFuture便应运而生了。
1. 创建一个异步任务
public static CompletableFuture supplyAsync(Supplier supplier) public static CompletableFuture supplyAsync(Supplier supplier,Executor executor); public static CompletableFuturerunAsync(Runnable runnable); public static CompletableFuture runAsync(Runnable runnable,Executor executor);
supplyAsync与runAsync的区别在于:supplyAsync有返回值,而runAsync没有返回值
带Executor参数的构造函数,则使用线程池中的线程执行异步任务(线程池可以参考说说线程池)
不带Executor参数的构造函数,则使用ForkJoinPool.commonPool()中的线程执行异步任务(Fork/Join框架可以参考谈谈并行流parallelStream)
1.1 示例:使用supplyAsync创建一个有返回值的异步任务
public class Case1 { public static void main(String[] args) throws Exception { CompletableFuturecompletableFuture=CompletableFuture.supplyAsync(()->{ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return 1; }); //该方法会一直阻塞 Integer result = completableFuture.get(); System.out.println(result); } }
2. 异步任务的回调
public CompletableFuturewhenComplete(BiConsumer super T, ? super Throwable> action); public CompletableFuture whenCompleteAsync(BiConsumer super T, ? super Throwable> action); public CompletableFuture whenCompleteAsync(BiConsumer super T, ? super Throwable> action, Executor executor); public CompletableFuture exceptionally(Function fn);
whenComplete开头的方法在计算任务完成(包括正常完成与出现异常)之后会回调
而exceptionally则只会在计算任务出现异常时才会被回调
如何确定哪个线程去回调whenComplete,比较复杂,先略过。
而回调whenCompleteAsync的线程比较简单,随便拿一个空闲的线程即可,后缀是Async的方法同理。
2.1 示例:计算出现异常,使用whenComplete与exceptionally进行处理
package com.qcy.testCompleteableFuture; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.function.BiConsumer; import java.util.function.Function; import java.util.stream.IntStream; /** * @author qcy * @create 2020/09/07 17:40:44 */ public class Case2 { public static void main(String[] args) throws Exception { CompletableFuturecompletableFuture = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("执行supplyAsync的线程:" + Thread.currentThread().getName()); int i = 1 / 0; return 1; }); completableFuture.whenComplete(new BiConsumer () { @Override public void accept(Integer integer, Throwable throwable) { System.out.println("执行whenComplete的线程:" + Thread.currentThread().getName()); if (throwable == null) { System.out.println("计算未出现异常,结果:" + integer); } } }); completableFuture.exceptionally(new Function () { @Override public Integer apply(Throwable throwable) { //出现异常时,则返回一个默认值 System.out.println("计算出现异常,信息:" + throwable.getMessage()); return -1; } }); System.out.println(completableFuture.get()); } }
输出:
当然,CompletableFuture内的各种方法是支持链式调用与Lambda表达式的,我们进行如下改写:
public static void main(String[] args) throws Exception { CompletableFuturecompletableFuture = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("执行supplyAsync的线程:" + Thread.currentThread().getName()); int i = 1 / 0; return 1; }).whenComplete((integer, throwable) -> { System.out.println("执行whenComplete的线程:" + Thread.currentThread().getName()); if (throwable == null) { System.out.println("计算未出现异常,结果:" + integer); } }).exceptionally(throwable -> { //出现异常时,则返回一个默认值 System.out.println("计算出现异常,信息:" + throwable.getMessage()); return -1; }); System.out.println("计算结果:" + completableFuture.get()); }
3. 任务串行化执行
public CompletableFuture thenApply(Function super T,? extends U> fn); public CompletableFuturethenRun(Runnable action); public CompletableFuture thenAccept(Consumer super T> action); public CompletableFuture handle(BiFunction super T, Throwable, ? extends U> fn); public CompletableFuture thenCompose(Function super T, ? extends CompletionStage> fn);
thenApply,依赖上一次任务执行的结果,参数中的Function super T,? extends U>,T代表上一次任务返回值的类型,U代表当前任务返回值的类型,当上一个任务没有出现异常时,thenApply才会被调用
thenRun,不需要知道上一个任务的返回结果,只是在上一个任务执行完成之后开始执行Runnable
thenAccept,依赖上一次任务的执行结果,因为入参是Consumer,所以不返回任何值。
handle和thenApply相似,不过当上一个任务出现异常时,能够执行handle,却不会去执行thenApply
thenCompose,传入一次任务执行的结果,返回一个新的CompleteableFuture对象
3.1 示例:使用串行化任务分解两数相乘并输出
package com.qcy.testCompleteableFuture; import java.util.concurrent.CompletableFuture; /** * @author qcy * @create 2020/09/07 17:40:44 */ public class Case4 { public static void main(String[] args) { CompletableFuture.supplyAsync(() -> 2) .thenApply(num -> num * 3) .thenAccept(System.out::print); } }
很显然,输出为6
3.2 示例:使用串行化任务并且模拟出现异常
package com.qcy.testCompleteableFuture; import java.util.concurrent.CompletableFuture; import java.util.function.BiFunction; /** * @author qcy * @create 2020/09/07 17:40:44 */ public class Case4 { public static void main(String[] args) { CompletableFuture.supplyAsync(() -> 2) .thenApply(num -> num / 0) .thenApply(result -> result * 3) .handle((integer, throwable) -> { if (throwable == null) { return integer; } else { throwable.printStackTrace(); return -1; } }).thenAccept(System.out::print); } }
最终会输出-1
4. 任务同时执行,且都需要执行完成
public CompletableFuturethenCombine(CompletionStage extends U> other, Function super T,? super U,? extends V> fn); public CompletableFuture thenAcceptBoth(CompletionStage extends U> other, Consumer super T, ? super U> action); public CompletableFuture runAfterBoth(CompletionStage> other,Runnable action); public static CompletableFuture allOf(CompletableFuture>... cfs);
thenCombine,合并两个任务,两个任务可以同时执行,都执行成功后,执行最后的BiFunction操作。其中T代表第一个任务的执行结果类型,U代表第二个任务的执行结果类型,V代表合并的结果类型
thenAcceptBoth,和thenCombine特性用法都极其相似,唯一的区别在于thenAcceptBoth进行一个消费,没有返回值
runAfterBoth,两个任务都执行完成后,但不关心他们的返回结构,然后去执行一个Runnable。
allOf,当所有的任务都执行完成后,返回一个CompletableFuture
4.1 示例:使用thenCombine合并任务
package com.qcy.testCompleteableFuture; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; /** * @author qcy * @create 2020/09/07 17:40:44 */ public class Case5 { public static void main(String[] args) throws Exception { CompletableFuturecf1 = CompletableFuture.supplyAsync(() -> { System.out.println("任务1开始"); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任务1结束"); return 2; }); CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> { System.out.println("任务2开始"); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任务2结束"); return 3; }); CompletableFuture completableFuture = cf1.thenCombine(cf2, (result1, result2) -> result1 * result2); System.out.println("计算结果:" + completableFuture.get()); } }
输出:
可以看到两个任务确实是同时执行的
当然,熟练了之后,直接使用链式操作,代码如下:
package com.qcy.testCompleteableFuture; import java.util.concurrent.CompletableFuture; /** * @author qcy * @create 2020/09/07 17:40:44 */ public class Case6 { public static void main(String[] args) throws Exception { CompletableFuturecompletableFuture = CompletableFuture.supplyAsync(() -> { System.out.println("任务1开始"); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任务1结束"); return 2; }).thenCombine(CompletableFuture.supplyAsync(() -> { System.out.println("任务2开始"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任务2结束"); return 3; }), (result1, result2) -> result1 * result2); System.out.println("计算结果:" + completableFuture.get()); } }
5. 任务同时执行,且只取最先完成的那个任务
public CompletableFuture applyToEither(CompletionStage extends T> other, Function super T, U> fn); public CompletableFutureacceptEither(CompletionStage extends T> other, Consumer super T> action); public CompletableFuture runAfterEither(CompletionStage> other,Runnable action); public static CompletableFuture
applyToEither,最新执行完任务,将其结果执行Function操作,其中T是最先执行完的任务结果类型,U是最后输出的类型
acceptEither,最新执行完的任务,将其结果执行消费操作
runAfterEither,任意一个任务执行完成之后,执行Runnable操作
anyOf,多个任务中,返回最先执行完成的CompletableFuture
5.1 示例:两个任务同时执行,打印最先完成的任务的结果
package com.qcy.testCompleteableFuture; import java.util.concurrent.CompletableFuture; /** * @author qcy * @create 2020/09/07 17:40:44 */ public class Case7 { public static void main(String[] args) throws Exception { CompletableFuturecompletableFuture = CompletableFuture.supplyAsync(() -> { System.out.println("任务1开始"); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任务1结束"); return 2; }).acceptEither(CompletableFuture.supplyAsync(() -> { System.out.println("任务2开始"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任务2结束"); return 3; }), result -> System.out.println(result)); //等待CompletableFuture返回,防止主线程退出 completableFuture.join(); } }
输出:
可以看得到,任务2结束后,直接不再执行任务1的剩余代码
5.2 示例:多个任务同时执行,打印最先完成的任务的结果
package com.qcy.testCompleteableFuture; import java.util.concurrent.CompletableFuture; /** * @author qcy * @create 2020/09/07 17:40:44 */ public class Case8 { public static void main(String[] args) throws Exception { CompletableFuturecf1 = CompletableFuture.supplyAsync(() -> { System.out.println("任务1开始"); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任务1结束"); return 2; }); CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> { System.out.println("任务2开始"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任务2结束"); return 3; }); CompletableFuture cf3 = CompletableFuture.supplyAsync(() -> { System.out.println("任务3开始"); try { Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任务3结束"); return 4; }); CompletableFuture
输出:
感谢各位的阅读,以上就是“怎么使用CompletableFuture”的内容了,经过本文的学习后,相信大家对怎么使用CompletableFuture这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是创新互联,小编将为大家推送更多相关知识点的文章,欢迎关注!
文章名称:怎么使用CompletableFuture
URL标题:http://ybzwz.com/article/jiehdp.html