浅析Spring中Async注解底层异步线程池原理

doMore 174 2024-06-20

前言

开发中我们经常会用到异步方法调用,具体到代码层面,异步方法调用的实现方式有很多种,比如最原始的通过实现Runnable接口或者继承Thread类创建异步线程,然后启动异步线程;再如,可以直接用java.util.concurrent包提供的线程池相关API实现异步方法调用。

如果说可以用一行代码快速实现异步方法调用,那是不是比上面方法香很多。

Spring提供了Async注解,就可以帮助我们一行代码搞定异步方法调用。Async注解用起来是很爽,但是如果不对其底层实现做深入研究,难免有时候也会心生疑虑,甚至会因使用不当,遇见一些让人摸不着头脑的问题。

本文首先将对Async注解做简单介绍,然后和大家分享一个我们项目中因Async注解使用不当的线上问题,接着再深扒Spring源码,对Async注解底层异步线程池的实现原理一探究竟。

Async注解简介

Async 注解定义源码


  
/**  
 * Annotation that marks a method as a candidate for <i>asynchronous</i> execution. * Can also be used at the type level, in which case all of the type's methods are * considered as asynchronous. Note, however, that {@code @Async} is not supported * on methods declared within a * {@link org.springframework.context.annotation.Configuration @Configuration} class.  
 * * <p>In terms of target method signatures, any parameter types are supported.  
 * However, the return type is constrained to either {@code void} or * {@link java.util.concurrent.Future}. In the latter case, you may declare the  
 * more specific {@link org.springframework.util.concurrent.ListenableFuture} or  
 * {@link java.util.concurrent.CompletableFuture} types which allow for richer  
 * interaction with the asynchronous task and for immediate composition with * further processing steps. * * <p>A {@code Future} handle returned from the proxy will be an actual asynchronous  
 * {@code Future} that can be used to track the result of the asynchronous method * execution. However, since the target method needs to implement the same signature, * it will have to return a temporary {@code Future} handle that just passes a value * through: e.g. Spring's {@link AsyncResult}, EJB 3.1's {@link javax.ejb.AsyncResult},  
 * or {@link java.util.concurrent.CompletableFuture#completedFuture(Object)}.  
 * * @author Juergen Hoeller * @author Chris Beams * @since 3.0 * @see AnnotationAsyncExecutionInterceptor  
 * @see AsyncAnnotationAdvisor  
 */  
@Target({ElementType.TYPE, ElementType.METHOD})  
@Retention(RetentionPolicy.RUNTIME)  
@Documented  
public @interface Async {  
  
    /**  
     * A qualifier value for the specified asynchronous operation(s).     * <p>May be used to determine the target executor to be used when executing  
     * the asynchronous operation(s), matching the qualifier value (or the bean     * name) of a specific {@link java.util.concurrent.Executor Executor} or  
     * {@link org.springframework.core.task.TaskExecutor TaskExecutor}  
     * bean definition.     * <p>When specified on a class-level {@code @Async} annotation, indicates that the  
     * given executor should be used for all methods within the class. Method-level use     * of {@code Async#value} always overrides any value set at the class level.     * @since 3.1.2     */    String value() default "";  
  
}

从源码可以看出@Async注解定义很简单,只需要关注两点:

  • Target({ElementType.TYPE, ElementType.METHOD}) 标志Async注解可以作用在方法和类上,作用在类上时,类的所有方法可以实现异步调用。
  • String value( ) default “” 是唯一字段属性,用来指定异步线程池,且该字段有缺省值。

Async注解异步调用实现原理概述

在Spring框架中,Async 注解的实现是通过AOP来实现的。具体来说,Async注解是由AsyncAnnotationAdvisor 这个切面类来实现的。

AsyncAnnotationAdvisor 类是Spring框架中用于处理Async注解的切面,它会在被Async注解标识的方法被调用时,创建一个异步代理对象来执行方法。这个异步代理对象会在一个新的线程中调用被@Async注解标识的方法,从而实现方法的异步执行。

在AsyncAnnotationAdvisor中,会使用AsyncExecutionInterceptor来处理Async注解。AsyncExecutionInterceptor是实现了MethodInterceptor接口的类,用于拦截被Async注解标识的方法的调用,并在一个新的线程中执行这个方法。

通过AOP的方式实现Async注解的异步执行,Spring框架可以在方法调用时动态地创建代理对象来实现异步执行,而不需要在业务代码中显式地创建新线程。

总的来说,Async注解的实现是通过AOP机制来实现的,具体的切面类是AsyncAnnotationAdvisor,它利用AsyncExecutionInterceptor来处理被Async注解标识的方法的调用,实现方法的异步执行。

Async注解底层异步线程池原理探究

获取 Async 注解线程池主流程解析

进入到Spring源码Async注解AOP切面实现部分,我们重点剖析异步调用实现中线程池是怎么处理的。下图是org.springframework.aop.interceptor.AsyncExecutionInterceptor#invoke方法的实现,可以看出是调用determineAsyncExecutor方法获取异步线程池。

// org.springframework.aop.interceptor.AsyncExecutionInterceptor#invoke

public Object invoke(final MethodInvocation invocation) throws Throwable {  
    Class<?> targetClass = invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null;  
    Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);  
    Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);  
    // determineAsyncExecutor 获取线程池
    AsyncTaskExecutor executor = this.determineAsyncExecutor(userDeclaredMethod);  
    if (executor == null) {  
        throw new IllegalStateException("No executor specified and no default executor set on AsyncExecutionInterceptor either");  
    } else {  
        Callable<Object> task = () -> {  
            try {  
                Object result = invocation.proceed();  
                if (result instanceof Future) {  
                    return ((Future)result).get();  
                }  
            } catch (ExecutionException var4) {  
                this.handleError(var4.getCause(), userDeclaredMethod, invocation.getArguments());  
            } catch (Throwable var5) {  
                this.handleError(var5, userDeclaredMethod, invocation.getArguments());  
            }  
  
            return null;  
        };  
        return this.doSubmit(task, executor, invocation.getMethod().getReturnType());  
    }  
}


@Nullable  
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {  
// 先尝试从缓存中获取
    AsyncTaskExecutor executor = (AsyncTaskExecutor)this.executors.get(method);  
    if (executor == null) {  
        String qualifier = this.getExecutorQualifier(method);  
        Executor targetExecutor;  
        // 如果指定了线程池 用自定义线程池
        if (StringUtils.hasLength(qualifier)) {  
            targetExecutor = this.findQualifiedExecutor(this.beanFactory, qualifier);  
        } else {  
        // 没有自定义,使用 spring 默认线程池
            targetExecutor = (Executor)this.defaultExecutor.get();  
        }  
  
        if (targetExecutor == null) {  
            return null;  
        }  
  
        executor = targetExecutor instanceof AsyncListenableTaskExecutor ? (AsyncListenableTaskExecutor)targetExecutor : new TaskExecutorAdapter(targetExecutor);  
        // 放入缓存,下次可以直接使用
        this.executors.put(method, executor);  
    }  
  
    return (AsyncTaskExecutor)executor;  
}

// org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor#getExecutorQualifier
// 获取 Async value 属性值
@Override  
@Nullable  
protected String getExecutorQualifier(Method method) {  
    // Maintainer's note: changes made here should also be made in  
    // AnnotationAsyncExecutionAspect#getExecutorQualifier    Async async = AnnotatedElementUtils.findMergedAnnotation(method, Async.class);  
    if (async == null) {  
       async = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), Async.class);  
    }  
    return (async != null ? async.value() : null);  
}


Spring是怎么为Async注解提供默认线程池的

Async注解默认线程池有下面两个方法实现:

  • org.springframework.aop.interceptor.AsyncExecutionInterceptor#getDefaultExecutor
// 该类 继承  AsyncExecutionAspectSupport
@Nullable  
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {  
    Executor defaultExecutor = super.getDefaultExecutor(beanFactory);  
    return (Executor)(defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());  
}

可以看出AsyncExecutionInterceptor#getDefaultExecutor方法比较简单:先尝试调用父类AsyncExecutionAspectSupport#getDefaultExecutor方法获取线程池,如果父类方法获取不到线程池再用创建SimpleAsyncTaskExecutor对象作为Async的线程池返回

  • org.springframework.aop.interceptor.AsyncExecutionAspectSupport#getDefaultExecutor
@Nullable  
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {  
    if (beanFactory != null) {  
        try {  
            return (Executor)beanFactory.getBean(TaskExecutor.class);  
        } catch (NoUniqueBeanDefinitionException var6) {  
            this.logger.debug("Could not find unique TaskExecutor bean. Continuing search for an Executor bean named 'taskExecutor'", var6);  
  
            try {  
                return (Executor)beanFactory.getBean("taskExecutor", Executor.class);  
            } catch (NoSuchBeanDefinitionException var4) {  
                if (this.logger.isInfoEnabled()) {  
                    this.logger.info("More than one TaskExecutor bean found within the context, and none is named 'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly as an alias) in order to use it for async processing: " + var6.getBeanNamesFound());  
                }  
            }  
        } catch (NoSuchBeanDefinitionException var7) {  
            this.logger.debug("Could not find default TaskExecutor bean. Continuing search for an Executor bean named 'taskExecutor'", var7);  
  
            try {  
                return (Executor)beanFactory.getBean("taskExecutor", Executor.class);  
            } catch (NoSuchBeanDefinitionException var5) {  
                this.logger.info("No task executor bean found for async processing: no bean of type TaskExecutor and no bean named 'taskExecutor' either");  
            }  
        }  
    }  
  
    return null;  
}

再来看父类AsyncExecutionAspectSupport#getDefaultExecutor方法的实现,可以看到Spring根据类型从**S****pring容器中获取TaskExecutor类的实例。

Spring根据类型获取实例时,如果Spring容器中有且只有一个指定类型的实例对象,会直接返回,否则的话,会抛出NoUniqueBeanDefinitionException异常或者NoSuchBeanDefinitionException异常。

但是,对于Executor类型,Spring容器却 有一个特殊处理:当从Spring容器中获取Executor实例对象时,如果满足@ConditionalOnMissingBean(Executor.class)条件,Spring容器会自动装载一个ThreadPoolTaskExecutor实例对象,而ThreadPoolTaskExecutor是TaskExecutor的实现类


@Bean  
@ConditionalOnMissingBean  
public TaskExecutorBuilder taskExecutorBuilder(TaskExecutionProperties properties,  
       ObjectProvider<TaskExecutorCustomizer> taskExecutorCustomizers,  
       ObjectProvider<TaskDecorator> taskDecorator) {  
    TaskExecutionProperties.Pool pool = properties.getPool();  
    TaskExecutorBuilder builder = new TaskExecutorBuilder();  
    builder = builder.queueCapacity(pool.getQueueCapacity());  
    builder = builder.corePoolSize(pool.getCoreSize());  
    builder = builder.maxPoolSize(pool.getMaxSize());  
    builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout());  
    builder = builder.keepAlive(pool.getKeepAlive());  
    Shutdown shutdown = properties.getShutdown();  
    builder = builder.awaitTermination(shutdown.isAwaitTermination());  
    builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod());  
    builder = builder.threadNamePrefix(properties.getThreadNamePrefix());  
    builder = builder.customizers(taskExecutorCustomizers.orderedStream()::iterator);  
    builder = builder.taskDecorator(taskDecorator.getIfUnique());  
    return builder;  
}

// APPLICATION_TASK_EXECUTOR_BEAN_NAME =  "applicationTaskExecutor"
// AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME =  "taskExecutor"
@Lazy  
@Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,  
       AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME })  
@ConditionalOnMissingBean(Executor.class)  
public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {  
    return builder.build();  
}

org.springframework.boot.autoconfigure.task.TaskExecutionProperties

从TaskExecutionProperties和TaskExecutionAutoConfiguration两个配置类可以看出 Spring 自动装载的ThreadPoolTaskExecutor线程池对象的参数:核心线程数=8;最大线程数=Integer.MAX_VALUE;队列大小=Integer.MAX_VALUE。

总结

如果在使用Async注解时没有指定自定义的线程池会出现以下几种情况:

  • 当Spring容器中有且仅有一个TaskExecutor实例时,Spring会用这个线程池来处理Async注解的异步任务,这可能会踩坑,如果这个TaskExecutor实例是第三方jar引入的,可能会出现很诡异的问题。

  • Spring创建一个核心线程数=8、最大线程数=Integer.MAX_VALUE、队列大小=Integer.MAX_VALUE的线程池来处理Async注解的异步任务,这时候也可能会踩坑,由于线程池参数设置不合理,核心线程数=8,队列大小过大,如果有大批量并发任务,可能会出现OOM

  • Spring创建SimpleAsyncTaskExecutor实例来处理Async注解的异步任务,SimpleAsyncTaskExecutor不是一个好的线程池实现类,SimpleAsyncTaskExecutor根据需要在当前线程或者新线程中执行异步任务。如果当前线程已经有空闲线程可用,任务将在当前线程中执行,否则将创建一个新线程来执行任务。由于这个线程池没有线程管理的能力,每次提交任务都实时创建新线程,所以如果任务量大,会导致性能下降