简介
JDK 8 中 CompletableFuture 没有超时中断任务的能力。现有做法强依赖任务自身的超时实现。本文提出一种异步超时实现方案,解决上述问题。
前言
JDK 8 是一个非常重要的版本,新增了非常多的特性,其中之一便是 CompletableFuture
。自此从 JDK 层面真正意义上的支持了基于事件的异步编程范式,弥补了 Future
的缺陷。
在我们的日常优化中,最常用手段便是多线程并行执行。这时候就会涉及到 CompletableFuture
的使用。
常见使用方式
下面举例一个常见场景。
假如我们有两个 RPC 远程调用服务,我们需要获取两个 RPC 的结果后,再进行后续逻辑处理。
public static void main(String[] args) {
// 任务 A,耗时 2 秒
int resultA = compute(1);
// 任务 B,耗时 2 秒
int resultB = compute(2);
// 后续业务逻辑处理
System.out.println(resultA + resultB);
}
可以预估到,串行执行最少耗时 4 秒,并且 B 任务并不依赖 A 任务结果。
对于这种场景,我们通常会选择并行的方式优化,Demo 代码如下:
public static void main(String[] args) {
// 仅简单举例,在生产代码中可别这么写!
// 统计耗时的函数
time(() -> {
CompletableFuture<Integer> result = Stream.of(1, 2)
// 创建异步任务
.map(x -> CompletableFuture.supplyAsync(() -> compute(x), executor))
// 聚合
.reduce(CompletableFuture.completedFuture(0), (x, y) -> x.thenCombineAsync(y, Integer::sum, executor));
// 等待结果
try {
System.out.println("结果:" + result.get());
} catch (ExecutionException | InterruptedException e) {
System.err.println("任务执行异常");
}
});
}
输出:
[async-1]: 任务执行开始:1
[async-2]: 任务执行开始:2
[async-1]: 任务执行完成:1
[async-2]: 任务执行完成:2
结果:3
耗时:2 秒
存在的问题
分析
看上去 CompletableFuture
现有功能可以满足我们诉求。但当我们引入一些现实常见情况时,一些潜在的不足便暴露出来了。
compute(x)
如果是一个根据入参查询用户某类型优惠券列表的任务,我们需要查询两种优惠券并组合在一起返回给上游。假如上游要求我们 2 秒内处理完毕并返回结果,但 compute(x)
耗时却在 0.5 秒 ~ 无穷大波动。这时候我们就需要把耗时过长的 compute(x)
任务结果放弃,仅处理在指定时间内完成的任务,尽可能保证服务可用。
那么以上代码的耗时由耗时最长的服务决定,无法满足现有诉求。通常我们会使用 get(long timeout, TimeUnit unit)
来指定获取结果的超时时间,并且我们会给 compute(x)
设置一个超时时间,达到后自动抛异常来中断任务。
public static void main(String[] args) {
// 仅简单举例,在生产代码中可别这么写!
// 统计耗时的函数
time(() -> {
List<CompletableFuture<Integer>> result = Stream.of(1, 2)
// 创建异步任务,compute(x) 超时抛出异常
.map(x -> CompletableFuture.supplyAsync(() -> compute(x), executor))
.toList();
// 等待结果
int res = 0;
for (CompletableFuture<Integer> future : result) {
try {
res += future.get(2, SECONDS);
} catch (ExecutionException | InterruptedException | TimeoutException e) {
System.err.println("任务执行异常或超时");
}
}
System.out.println("结果:" + res);
});
}
输出:
[async-2]: 任务执行开始:2
[async-1]: 任务执行开始:1
[async-1]: 任务执行完成:1
任务执行异常或超时
结果:1
耗时:2 秒
可以看到,只要我们能够给 compute(x)
设置一个超时时间将任务中断,结合 get
、getNow
等获取结果的方式,就可以很好地管理整体耗时。
那么问题也就转变成了,如何给任务设置异步超时时间呢?
解决方式
JDK 9
这类问题非常常见,如大促场景,服务器 CPU 瞬间升高就会出现以上问题。
那么如何解决呢?其实 JDK 的开发大佬们早有研究。在 JDK 9,CompletableFuture
正式提供了 orTimeout
、completeOnTimeout
方法,来准确实现异步超时控制。
public CompletableFuture<T> orTimeout(long timeout, TimeUnit unit) {
if (unit == null)
throw new NullPointerException();
if (result == null)
whenComplete(new Canceller(Delayer.delay(new Timeout(this), timeout, unit)));
return this;
}
JDK 9 orTimeout
其实现原理是通过一个定时任务,在给定时间之后抛出异常。如果任务在指定时间内完成,则取消抛异常的操作。
以上代码我们按执行顺序来看下:
首先执行 new Timeout(this)
。
static final class Timeout implements Runnable {
final CompletableFuture<?> f;
Timeout(CompletableFuture<?> f) { this.f = f; }
public void run() {
if (f != null && !f.isDone())
// 抛出超时异常
f.completeExceptionally(new TimeoutException());
}
}
通过源码可以看到,Timeout
是一个实现 Runnable 的类,run()
方法负责给传入的异步任务通过 completeExceptionally
CAS 赋值异常,将任务标记为异常完成。
那么谁来触发这个 run()
方法呢?我们看下 Delayer
的实现。
static final class Delayer {
static ScheduledFuture<?> delay(Runnable command, long delay,
TimeUnit unit) {
// 到时间触发 command 任务
return delayer.schedule(command, delay, unit);
}
static final class DaemonThreadFactory implements ThreadFactory {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("CompletableFutureDelayScheduler");
return t;
}
}
static final ScheduledThreadPoolExecutor delayer;
static {
(delayer = new ScheduledThreadPoolExecutor(
1, new DaemonThreadFactory())).
setRemoveOnCancelPolicy(true);
}
}
Delayer
其实就是一个单例定时调度器,Delayer.delay(new Timeout(this), timeout, unit)
通过 ScheduledThreadPoolExecutor
实现指定时间后触发 Timeout
的 run()
方法。
到这里就已经实现了超时抛出异常的操作。但当任务完成时,就没必要触发 Timeout
了。因此我们还需要实现一个取消逻辑。
static final class Canceller implements BiConsumer<Object, Throwable> {
final Future<?> f;
Canceller(Future<?> f) { this.f = f; }
public void accept(Object ignore, Throwable ex) {
if (ex == null && f != null && !f.isDone())
// 3 未触发抛异常任务则取消
f.cancel(false);
}
}
当任务执行完成,或者任务执行异常时,我们也就没必要抛出超时异常了。因此我们可以把 delayer.schedule(command, delay, unit)
返回的定时超时任务取消,不再触发 Timeout
。当我们的异步任务完成,并且定时超时任务未完成的时候,就是我们取消的时机。因此我们可以通过 whenComplete(BiConsumer<? super T, ? super Throwable> action)
来完成。
Canceller
就是一个 BiConsumer
的实现。其持有了 delayer.schedule(command, delay, unit)
返回的定时超时任务,accept(Object ignore, Throwable ex)
实现了定时超时任务未完成后,执行 cancel(boolean mayInterruptIfRunning)
取消任务的操作。
JDK 8
如果我们使用的是 JDK 9 或以上,我们可以直接用 JDK 的实现来完成异步超时操作。那么 JDK 8 怎么办呢?
其实我们也可以根据上述逻辑简单实现一个工具类来辅助。
import java.util.concurrent.*;
import java.util.function.BiConsumer;
/**
* @author suxingkang * @version 1.0 * @since 2024/12/26 8:46
* */
public class CompletableFutureExpandUtils {
public static <T> CompletableFuture<T> orTimeOut(CompletableFuture<T> future, long timeout, TimeUnit unit) {
if (unit == null) {
throw new RuntimeException("时间的给定力度不能为空");
}
if (future == null) {
throw new RuntimeException("异步任务不能为空");
}
if (future.isDone()) {
return future;
}
return future.whenComplete(new Canceller(Delayer.delay(new TimeOut(future), timeout, unit)));
}
/**
* 超时时异常完成的操作
*/
static final class TimeOut implements Runnable {
final CompletableFuture<?> completableFuture;
public TimeOut(CompletableFuture<?> completableFuture) {
this.completableFuture = completableFuture;
}
@Override
public void run() {
if (null != completableFuture && !completableFuture.isDone()) {
completableFuture.completeExceptionally(new TimeoutException());
}
}
}
/**
* 取消不需要的超时的操作
*/
static final class Canceller implements BiConsumer<Object, Throwable> {
final Future<?> future;
public Canceller(Future<?> future) {
this.future = future;
}
@Override
public void accept(Object ignore, Throwable ex) {
if (null != ex && null != future && !future.isDone()) {
future.cancel(false);
}
}
}
/**
* 单例延迟调度器,仅用于启动和取消任务,一个线程就足够
*/
static final class Delayer {
static final ScheduledThreadPoolExecutor delayer;
static ScheduledFuture<?> delay(Runnable command, long delay, TimeUnit unit) {
return delayer.schedule(command, delay, unit);
}
static final class DaemonThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("CompletableFutureExpandUtilsDelayScheduler");
return thread;
}
}
static {
delayer = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory());
delayer.setRemoveOnCancelPolicy(true);
}
}
}
总结
在 JDK 8 场景下,现有超时中断的做法依赖于任务本身的超时实现,当任务本身的超时失效,或者不够精确时,并没有很好的手段来中断任务。因此本文给出一种让 CompletableFuture 支持异步超时的实现方案实现思路,仅供大家参考。