Administrator
Published on 2021-12-15 / 199 Visits
0
0

JUC之CompletableFuture

JUC之CompletableFuture

Future 与 Callable

  • Future 接口定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等

  • Callable接口中定义了需要有返回的任务需要实现的方法

  • 比如主线程让一个子线程去执行任务,子线程可能比较耗时,启动子线程开始执行任务后,主线程就去做其他事情了,过了一会才去获取子任务的执行结果。

FutureTask

future-task

  • 一旦调用 get() 方法,不管是否计算完成都会导致阻塞
  • isDone() 可以获取是否执行完毕,但是轮询的方式会耗费无谓的CPU资源,而且也不见得能及时地得到计算结果

CompletionStage

  • completionstage 代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段
  • 一个阶段的计算执行可以是一个 Function,Consumer 或者 Runnable,比如:stage.thenApply(x->square(x)).thenAccept
    (x -> System.out.print(x)).thenRun(() -> System.out.printIn())
  • 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发

CompletableFuture

在Java8中,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复茶性, 并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法

它可能代表—个明确完成的 Future,也有可能代表一个完成阶段(Completionstage),它支持在计算完成以后触发一些西数或执行某些动作

它实现了 Future 和 CompletionStage 接口

创建异步任务

无返回值

// 创建无返回值的异步任务
public static CompletableFuture<Void> runAsync(Runnable runnable);
// 无返回值,可指定线程池(默认使用ForkJoinPool.commonPool)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);

有返回值

// 创建有返回值的异步任务
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
// 有返回值,可指定线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);

常用方法

获得结果和触发计算

public T get()
public T get(long timeout, TimeUnit unit)
// 没有计算完成的情况下,给我一个替代结果
public T getNow(T valueIfAbsent)
public T join()
// 主动触发计算,是否打断get方法立即返回括号值
public boolean complete(T value)

对计算结果进行处理

// 计算结果存在依赖关系,这两个线程串行化,由于存在依赖关系(当前步错,不走下一步),当前步骤有异常的话就叫停
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)

// 有异常也可以往下一步走,根据带的异常参数可以进一步处理    
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)

对计算结果进行消费

// 任务 A 执行完执行 B,并且 B 不需要 A 的结果
public CompletableFuture<Void> thenRun(Runnable action) 
// 任务 A 执行完执行 B,B 需要 A 的结果,但是任务 B 无返回值 
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
// 任务 A 执行完执行 B,B 需要 A 的结果,同时任务 B 有返回值
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)

对计算速度进行选用

public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)

对计算结果进行合并

public <U,V> CompletableFuture<V> thenCombine(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn)

优点

  • 异步任务结束时,会自动回调某个对象的方法
  • 异步任务出错时,会自动回调某个对象的方法
  • 主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行

注意事项

whenComplete 与 whenCompleteAsync

  • Whencomplete:是执行当前任务的线程执行继续执行 whencomplete 的任务
  • whencompleteAsync:是执行把 WhenCompleteAsync 这个任务继续提交给线程池来进行执行

CompletableFuture默认线程池是否满足使用

前面提到创建CompletableFuture异步任务的静态方法runAsync和supplyAsync等,可以指定使用的线程池,不指定则用CompletableFuture的默认线程池:

private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

可以看到,CompletableFuture默认线程池是调用ForkJoinPool的commonPool()方法创建,这个默认线程池的核心线程数量根据CPU核数而定,公式为Runtime.getRuntime().availableProcessors() - 1,以4核双槽CPU为例,核心线程数量就是4*2-1=7个。这样的设置满足CPU密集型的应用,但对于业务都是IO密集型的应用来说,是有风险的,当qps较高时,线程数量可能就设的太少了,会导致线上故障。所以可以根据业务情况自定义线程池使用。

get设置超时时间不能串行get,不然会导致接口延时线程数量\*超时时间


Comment