java——》CompletableFuture 忘是亡心i 2023-12-27 00:40 68阅读 0赞 版权声明:本文为博主原创文章,无需授权即可转载,甚至无需保留以上版权声明,转载时请务必注明作者。 [https://blog.csdn.net/weixin\_43453386/article/details/88913809][https_blog.csdn.net_weixin_43453386_article_details_88913809] #### java——》CompletableFuture #### * 一、CompletableFuture使用实践 * * 1、直接创建CompletableFuture * * 1)创建默认的CompletableFuture * 2)创建一个带result的CompletableFuture * 3)给CompletableFuture填充一个result * 4)给CompletableFuture填充一个异常 * 2、通过方法runAsync创建CompletableFuture(没有返回值) * 3、通过方法supplyAsync创建CompletableFuture(有返回值) * 4、CompletableFuture的完成动作 * * 1)whenComplete * 2)handle * 3)thenApply * 4)thenAccept * 5)thenAppeptBoth * 6)thenCombine * 7)acceptEither * 二、CompletableFuture实现机制 * * 1、执行任务:AsyncSupply.run * 2、添加回调:thenApply * 3、执行回调:CompletableFuture.postComplete > * 实现了CompletionStage 和 Future接口 > * 简化了Java异步编程能力 > * 任务执行完成之后执行“回调”(处理结果) ## 一、CompletableFuture使用实践 ## ### 1、直接创建CompletableFuture ### #### 1)创建默认的CompletableFuture #### > 默认的CompletableFuture是没有result的, > 这时调用 **future.get()** 会一直 **阻塞** ,直到有result或者出现异常 CompletableFuture<String> future = future = new CompletableFuture<>(); try { future.get(1, TimeUnit.SECONDS); } catch (Exception e) { // todo } #### 2)创建一个带result的CompletableFuture #### CompletableFuture<String> future = CompletableFuture.completedFuture("result"); future.get(); #### 3)给CompletableFuture填充一个result #### CompletableFuture<String> future = future = new CompletableFuture<>(); future.complete("result"); #### 4)给CompletableFuture填充一个异常 #### CompletableFuture<String> future = future = new CompletableFuture<>(); future.completeExceptionally(new RuntimeException("exception")); try { future.get(); } catch (Exception e) { assert "exception".equals(e.getCause().getMessage()); } ### 2、通过方法runAsync创建CompletableFuture(没有返回值) ### > * 没有返回值 > * Runnable任务 > * 如果入参有executor,则使用executor来执行异步任务 > * 如果入参没有executor,则默认使用ForkJoinPool.commonPool()作为执行异步任务的线程池 `public static CompletableFuture<Void> runAsync(Runnable runnable)` `public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)` CompletableFuture.runAsync(() -> { System.out.println("hello world"); }, executor); ### 3、通过方法supplyAsync创建CompletableFuture(有返回值) ### > * 有返回值 > * 异步任务 > * 如果入参有executor,则使用executor来执行异步任务 > * 如果入参没有executor,则默认使用ForkJoinPool.commonPool()作为执行异步任务的线程池 `public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)` `public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)` CompletableFuture.supplyAsync(() -> { System.out.println("hello world"); return "result"; }); ### 4、CompletableFuture的完成动作 ### #### 1)whenComplete #### > * action是Action类型,既可以处理正常返回值也可以处理异常 > * whenComplete:在任务执行完成后直接在当前线程内执行action动作 > * whenCompleteAsync:交给其他线程执行action(如果是线程池,执行action的可能和之前执行异步任务的是同一个线程) > * 入参带executor的交给executor线程池来执行action动作 > * 发生异常时,会在当前线程内执行exceptionally方法 `public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)` `public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)` `public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)` `public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)` CompletableFuture.supplyAsync(() -> { System.out.println("hello world"); return "result"; }).whenCompleteAsync((result, e) -> { System.out.println(result + " " + e); }).exceptionally((e) -> { System.out.println("exception " + e); return "exception"; }); #### 2)handle #### > * 可以使用handle方法来执行CompletableFuture返回类型转换 > * 处理正常返回值 和 异常(可以屏蔽异常,避免继续抛出) > * 返回新的CompletableFuture类型 > * 对上一次CompletableFuture执行完的结果进行某些操作 `public <U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn)` `public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn)` `public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)` CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> { System.out.println("hello world"); return "result"; }); CompletableFuture<Integer> f2 = f1.handle((r, e) -> { System.out.println("handle"); return 1; }); #### 3)thenApply #### > * 可以使用thenApply方法来执行CompletableFuture返回类型转换 > * 只有处理正常返回值,一旦有异常就会抛出 > * 返回新的CompletableFuture类型 > * 对上一次CompletableFuture执行完的结果进行某些操作 `public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)` `public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)` `public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)` CompletableFuture.supplyAsync(() -> { System.out.println("hello world"); return "result"; }).thenApply((r) -> { System.out.println(r); return "aaa"; }).thenApply((r) -> { System.out.println(r); return 1; }); #### 4)thenAccept #### > * 在CompletableFuture完成之后执行某些消费动作,而不返回新的CompletableFuture类型 > * 只有处理正常返回值,一旦有异常就会抛出 > * 对上一次CompletableFuture执行完的结果进行某些操作 `public CompletableFuture<Void> thenAccept(Consumer<? super T> action)` `public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)` `public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)` CompletableFuture.supplyAsync(() -> { System.out.println("hello world"); return "result"; }).thenAccept(r -> { System.out.println(r); }).thenAccept(r -> { // 这里的r为Void(null)了 System.out.println(r); }) #### 5)thenAppeptBoth #### > * 同时对2个CompletableFuture执行结果执行某些操作 > * 和handle/thenApply/thenAppep的流程是一样的 > * 只不过thenAppeptBoth中包含了另一个CompletableFuture对象(注意,这里另一个CompletableFuture对象的执行可并不是上一个CompletableFuture执行结束才开始执行的) > * 无返回值 > * thenAcceptBoth 和 runAfterBoth ,是2个CompletableFuture都计算完成 `public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)` `public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)` `public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor)` `public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)` CompletableFuture.supplyAsync(() -> { System.out.println("hello world"); return "result"; }).thenAcceptBoth(CompletableFuture.completedFuture("result2"), (r1, r2) -> { System.out.println(r1 + "-" + r2); }); #### 6)thenCombine #### > * 同时对2个CompletableFuture执行结果执行某些操作 > * 和handle/thenApply/thenAppep的流程是一样的 > * 只不过thenAppeptBoth中包含了另一个CompletableFuture对象(注意,这里另一个CompletableFuture对象的执行可并不是上一个CompletableFuture执行结束才开始执行的) > * 有返回值 `public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)` `public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)` `public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)` CompletableFuture.supplyAsync(() -> { System.out.println("hello world"); return "result"; }).thenCombine(CompletableFuture.completedFuture("result2"), (r1, r2) -> { System.out.println(r1 + "-" + r2); return r1 + "-" + r2; }); #### 7)acceptEither #### > * 当任意一个CompletableFuture计算完成的时候就会执行 `public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)` `public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)` `public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)` `public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)` `public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn)` `public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)` > * allOf :在多个CompletableFuture都计算完成后执行某个动作 > * anyOf:在多个CompletableFuture中的一个计算完成后执行某个动作 `public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)` `public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)` ## 二、CompletableFuture实现机制 ## > CompletableFuture实现流程: > > 1. 执行任务 > 2. 添加任务完成之后的动作(回调方法) > 3. 执行回调 CompletableFuture.supplyAsync(() -> { // callable任务 System.out.println("hello world"); return "result"; }).thenApply(r -> { // 任务完成之后的动作(回调方法),类似于ThreadPoolExecutor.afterExecute方法 System.out.println(r); return r; }); ### 1、执行任务:AsyncSupply.run ### public void run() { CompletableFuture<T> d; Supplier<T> f; // dep是当前CompletableFuture,fn是任务执行逻辑 if ((d = dep) != null && (f = fn) != null) { dep = null; fn = null; if (d.result == null) { try { // 1 任务执行 & result cas设置 d.completeValue(f.get()); } catch (Throwable ex) { // 1.1 result cas异常设置 d.completeThrowable(ex); } } // 2 任务完成,可能涉及到回调的执行 d.postComplete(); } } ### 2、添加回调:thenApply ### public <U> CompletableFuture<U> thenApply( Function<? super T,? extends U> fn) { return uniApplyStage(null, fn); } private <V> CompletableFuture<V> uniApplyStage( Executor e, Function<? super T,? extends V> f) { if (f == null) throw new NullPointerException(); CompletableFuture<V> d = new CompletableFuture<V>(); if (e != null || !d.uniApply(this, f, null)) { // 当上一个CompletableFuture未完成时,将该CompletableFuture添加 // 到上一个CompletableFuture的statck中 UniApply<T,V> c = new UniApply<T,V>(e, d, this, f); push(c); c.tryFire(SYNC); } return d; } ### 3、执行回调:CompletableFuture.postComplete ### final void postComplete() { /* * On each step, variable f holds current dependents to pop * and run. It is extended along only one path at a time, * pushing others to avoid unbounded recursion. */ CompletableFuture<?> f = this; Completion h; while ((h = f.stack) != null || (f != this && (h = (f = this).stack) != null)) { CompletableFuture<?> d; Completion t; // cas设置h.next到当前CompletableFuture.statck if (f.casStack(h, t = h.next)) { if (t != null) { if (f != this) { pushStack(h); continue; } h.next = null; // detach } f = (d = h.tryFire(NESTED)) == null ? this : d; } } } // UniAccept final CompletableFuture<Void> tryFire(int mode) { CompletableFuture<Void> d; CompletableFuture<T> a; if ((d = dep) == null || !d.uniAccept(a = src, fn, mode > 0 ? null : this)) // 执行回调 return null; dep = null; src = null; fn = null; // 返回当前CompletableFuture 或者 递归调用postComplete return d.postFire(a, mode); } [https_blog.csdn.net_weixin_43453386_article_details_88913809]: https://blog.csdn.net/weixin_43453386/article/details/88913809
还没有评论,来说两句吧...