Spring 之 @Async 的简单使用与源码解析

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

@Async 案例演示

有时候需我们需要让一个方法异步执行但是又不想自己去写异步代码毕竟和业务不相关。所以 Spring 就非常友好提供 @Async 注解让方法异步执行。

想要让 @Async 注解生效必须通过 @EnableAsync 注解来激活这个功能。代码如下

@Component
public class Person {

	@Async
	public void show(String name) {
		System.out.println("Thread.currentThread().getName() = " + Thread.currentThread().getName());
		System.out.println("======>show...");
	}
}

@ComponentScan
@EnableAsync
public class AopProxyTest {
	public static void main(String[] args) {
		System.out.println("Thread.currentThread().getName() = " + Thread.currentThread().getName());
		AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(AopProxyTest.class);

		Person bean = context.getBean(Person.class);

		bean.show("abc");
	}
}

测试结果如下

Thread.currentThread().getName() = main
Thread.currentThread().getName() = SimpleAsyncTaskExecutor-1
======>show...

从打印结果中可看出两个 show() 方法时被在 SimpleAsyncTaskExecutor-1 线程中执行的并不是在 main() 线程中。说明 @Async 已经生效。

@Async 源码分析

首先想要让 @Async 功能生效必须通过 @EnableAsync 注解才能激活因为在 @EnableAsync 注解中会通过 @Import 注解导入该功能相关类源码如下

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {

}

AsyncConfigurationSelector 类源码如下

public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {

	@Override
	@Nullable
	public String[] selectImports(AdviceMode adviceMode) {
		switch (adviceMode) {
			case PROXY:
				return new String[] {ProxyAsyncConfiguration.class.getName()};
			case ASPECTJ:
				return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
			default:
				return null;
		}
	}
}

看到 selectImports() 方法中导入了 ProxyAsyncConfiguration 类重点关注这个类就行所有的核心类都是通过这配置类导入的 ProxyAsyncConfiguration 类源码如下

@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {

	@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public AsyncAnnotationBeanPostProcessor asyncAdvisor() {

		AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();

		Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
		if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
			bpp.setAsyncAnnotationType(customAsyncAnnotation);
		}
		bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
		bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
		return bpp;
	}
}

然后重点看到这个 AsyncAnnotationBeanPostProcessor 类很明显又是 Spring BeanPostProcessor 接口的应用但是这个类比较特殊因为它并没有像事务、@Aspect 等传统型切面一样去实现 AbstractAutoProxyzCreator 抽象类。这个类自己搞特殊借助 BeanPostProcessor 接口做扩展。所以你只需要关注 BeanPostProcessor 接口中的方法即可。

这里还要注意的是该类还是实现了 BeanFactoryAware 接口那么必然要关注该接口的方法 setBeanFactory()。

进入 AsyncAnnotationBeanPostProcessor 类核心源码如下

public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {

	@Nullable
	private Supplier<Executor> executor;

	public AsyncAnnotationBeanPostProcessor() {
		setBeforeExistingAdvisors(true);
	}

	public void setExecutor(Executor executor) {
		this.executor = SingletonSupplier.of(executor);
	}

	@Override
	public void setBeanFactory(BeanFactory beanFactory) {
		super.setBeanFactory(beanFactory);
		/**
		 * 在这里 new 一个 Advisor AsyncAnnotationAdvisor 专门用来处理 @Async 注解的增强器
		 */
		AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
		if (this.asyncAnnotationType != null) {
			advisor.setAsyncAnnotationType(this.asyncAnnotationType);
		}
		advisor.setBeanFactory(beanFactory);
		this.advisor = advisor;
	}

}

AsyncAnnotationBeanPostProcessor 类构造方法中会设置 beforeExistingAdvisors 属性为 true然后在看到 setBeanFactory() 方法。再解释这个方法之前先回忆下要定义切面肯定要有 Advisor、而 Advisor 必然包含 Advice 和 Pointcut而 Pointcut 必然包含 ClassFilter、MethodMatcher 匹配器。这是 Spring 切面耳熟能详的一套框架。所以在看到 setBeanFactory() 方法中提供了 AsyncAnnotationAdvisor并且注意该类中有个 executor 线程执行器。

进入 AsyncAnnotationAdvisor 类核心源码如下

public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {

	private Advice advice;

	private Pointcut pointcut;

	public AsyncAnnotationAdvisor(
			@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

		Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
		asyncAnnotationTypes.add(Async.class);
		try {
			asyncAnnotationTypes.add((Class<? extends Annotation>)
					ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
		}
		catch (ClassNotFoundException ex) {
			// If EJB 3.1 API not present, simply ignore.
		}
		this.advice = buildAdvice(executor, exceptionHandler);
		this.pointcut = buildPointcut(asyncAnnotationTypes);
	}
}

从上述代码中可以看到就是要去构建 Advice + Pointcut进入 buildAdvice() 源码如下

	protected Advice buildAdvice(
			@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

		/**
		 * 在在这里生成 Advice AnnotationAsyncExecutionInterceptor 专门用来处理 @Async 注解
		 */
		AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
		interceptor.configure(executor, exceptionHandler);
		return interceptor;
	}

这里就是在创建一个 Adivce类型是 AnnotationAsyncExecutionInterceptor而且这个 Advice 持有 executor 线程执行器。这里创建的拦截器在目标方法调用时就会进入此 Advice 执行 @Async 逻辑。这个是 Spring 代理的基本调用流程。这里不过多赘述。

然后进入 buildPointcut() 方法核心源码如下

	protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
		ComposablePointcut result = null;
		for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
			/**
			 * new 了一个 AnnotationMatchingPointcut 对象 专门用来处理 @Async 注解的
			 */
			Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
			Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
			if (result == null) {
				result = new ComposablePointcut(cpc);
			}
			else {
				result.union(cpc);
			}
			result = result.union(mpc);
		}
		return (result != null ? result : Pointcut.TRUE);
	}

这里定义了两个 Pointcut 联合之后会对类或者方法进行匹配匹配是否有 @Async 注解修饰有的话就会生成代理对象。

看到第一个 AnnotationMatchingPointcut然后查看它的 matches() 匹配逻辑源码如下

	public AnnotationMatchingPointcut(Class<? extends Annotation> classAnnotationType, boolean checkInherited) {
		this.classFilter = new AnnotationClassFilter(classAnnotationType, checkInherited);
		this.methodMatcher = MethodMatcher.TRUE;
	}

首先看到 this.methodMatcher = MethodMatcher.TRUE 代码表示该 Pointcut 不对方法做检验进入 AnnotationClassFilter 类的 matches() 方法核心源码如下

	@Override
	public boolean matches(Class<?> clazz) {
		return (this.checkInherited ? AnnotatedElementUtils.hasAnnotation(clazz, this.annotationType) :
				clazz.isAnnotationPresent(this.annotationType));
	}

这个方法表示类上面是否有 @Async 注解修饰。

进入第二个 AnnotationMatchingPointcut然后查看它的 matches() 匹配逻辑核心源码如下

	@Override
	public boolean matches(Method method, Class<?> targetClass) {
		if (matchesMethod(method)) {
			return true;
		}
		// Proxy classes never have annotations on their redeclared methods.
		if (Proxy.isProxyClass(targetClass)) {
			return false;
		}
		// The method may be on an interface, so let's check on the target class as well.
		Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass);
		return (specificMethod != method && matchesMethod(specificMethod));
	}

	private boolean matchesMethod(Method method) {
		return (this.checkInherited ? AnnotatedElementUtils.hasAnnotation(method, this.annotationType) :
				method.isAnnotationPresent(this.annotationType));
	}

可以看出就是判断这个方法上是否有 @Async 注解修饰如果有就会创建代理对象。okay现在 Advisor 创建好了那么就要去看看在哪里创建代理对象的。

在回到 AsyncAnnotationBeanPostProcessor 类它实现了 BeanPostProcessor 接口所以关注这个接口提供的方法源码如下

	public Object postProcessBeforeInitialization(Object bean, String beanName) {
		return bean;
	}

postProcessBeforeInitialization() 方法并没有做任何事情略过。继续看到另外的方法源码如下

	@Override
	public Object postProcessAfterInitialization(Object bean, String beanName) {
		if (this.advisor == null || bean instanceof AopInfrastructureBean) {
			return bean;
		}

		if (bean instanceof Advised) {
			Advised advised = (Advised) bean;
			if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
				if (this.beforeExistingAdvisors) {
					advised.addAdvisor(0, this.advisor);
				}
				else {
					advised.addAdvisor(this.advisor);
				}
				return bean;
			}
		}

		if (isEligible(bean, beanName)) {
			ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
			if (!proxyFactory.isProxyTargetClass()) {
				evaluateProxyInterfaces(bean.getClass(), proxyFactory);
			}
			proxyFactory.addAdvisor(this.advisor);

			return proxyFactory.getProxy(classLoader);
		}

从上述代码一眼就可以看到 getProxy() 方法明显就是在这里去创建代理对象的。现在对上述代码拆分讲解

先看到下面这段逻辑判断代码如下

	if (this.advisor == null || bean instanceof AopInfrastructureBean) {
		return bean;
	}

因为刚才在 setBeanFactory() 方法中就已经设置了 AsyncAnnotationAdvisor所以这里的 advisor 不为 null现在只有 Person 类中的 show() 方法加了 @Async 注解也就是说只有 Person 才会创建代理对象。假设现在的 bean 是 Person 实例是个普通类型并不是 AopInfrastructureBean所以就不会走 return。只有实现了 AopInfrastructureBean 接口的 bean 才会走 return 表示不需要创建代理就比如 AsyncAnnotationBeanPostProcessor 类就实现了 AopInfrastructureBean 接口表示此类不需要创建代理很明显肯定不用创建代理如果这个类都要创建代理岂不是乱套了。所以 Spring 这个判断拦截非常细节。

然后再看到这段逻辑代码如下

	if (bean instanceof Advised) {
		Advised advised = (Advised) bean;
		if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
			if (this.beforeExistingAdvisors) {
				advised.addAdvisor(0, this.advisor);
			}
			else {
				advised.addAdvisor(this.advisor);
			}
			return bean;
		}
	}

什么时候的 bean 才会和 Advised 属于同一个类型呢那就是已经经过了代理的对象。这种情况那就是存在很多切面比如 @Aspect、@Async 注解同时修饰在 show() 方法上@Aspect 注解生效会通过 @EnableAspectJAutoProxy 注解激活该功能会导入 AnnotationAwareAspectJAutoProxyCreator而该类也是 BeanPostProcessor 接口应用前面已经知道 @Async 激活也是通过 AsyncAnnotationBeanPostProcessor 类也是一个 BeanPostProcessor 接口应用那么现在问题来了这两个类同时存在并且都是 BeanPostProcessor 接口类型而且都具备创建代理对象的功能。在 show() 方法上也标注 @Around、@Async 两个注解你认为会不会创建两个代理对象呢

答案是不会的AnnotationAwareAspectJAutoProxyCreatorAsyncAnnotationBeanPostProcessor 的优先级决定用哪个类去创建代理对象。而 AnnotationAwareAspectJAutoProxyCreator 类的优先级是高于后者的。所以这两个类同时存在时使用 AnnotationAwareAspectJAutoProxyCreator 类创建代理对象。然后再看到上面那段判断逻辑。bean假设是 Person 实例 此时被 AnnotationAwareAspectJAutoProxyCreator 类创建代理对象属于 Advised 对象。

然后再调用 isEligible() 方法取判断当前的 bean 是否能使用 AsyncAnnotationAdvisor 增强。答案是可以的因为在 show() 方法上有 @Async 注解前面的创建的 ClassFilter、MethodMatcher 匹配器会匹配到 @Async 注解。

然后再看到 this.beforeExistingAdvisors() 方法在前面就已经说过再 AsyncAnnotationBeanPostProcessor 类构造方法中就已设置为 true 所以这成立。就把 AsyncAnnotationAdvisor 切面加入到 Advised 集合中而且还是在第一位置也就表示在调用目标方法时首先会给你开启线程异步执行 show() 方法如果存在事务传播就要格外格外注意这一点不做过多扩展。此时代理对象中就有 AnnotationAwareAspectJAutoProxyCreator 类 Advice也有 AsyncAnnotationBeanPostProcessor 类 Advice所以在调用目标方法时就会执行所有 Advice。但是代理对象始终只有一个也只被创建过一次。

然后再看到最后一段逻辑代码如下

	if (isEligible(bean, beanName)) {
		ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
		if (!proxyFactory.isProxyTargetClass()) {
			evaluateProxyInterfaces(bean.getClass(), proxyFactory);
		}
		proxyFactory.addAdvisor(this.advisor);

		return proxyFactory.getProxy(classLoader);
	}

isEligible() 方法成立因为 Person#show() 方法有 @Async 注解修饰然后就开始调用 proxyFactory.getProxy() 创建代理对象这里需要注意 evaluateProxyInterfaces() 方法会去计算使用 cglib、还是 jdk 动态代理。ProxyFactory 是动态代理过程中的一个上下文对象封装着调用过程中需要的内容比如所有的 Advisors目标对象等等。

至此 AsyncAnnotationBeanPostProcessor#postProcessAfterInitialization() 就完成了代理对象也创建成功接下来就是调用的过程。

在调用目标方法 show() 时会触发到切面逻辑这里使用的是 cglib因为没有接口核心源码如下

	private static class DynamicAdvisedInterceptor implements MethodInterceptor, Serializable {

		private final AdvisedSupport advised;

		public DynamicAdvisedInterceptor(AdvisedSupport advised) {
			this.advised = advised;
		}

		@Override
		@Nullable
		public Object intercept(Object proxy, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {
			Object oldProxy = null;
			boolean setProxyContext = false;
			Object target = null;
			TargetSource targetSource = this.advised.getTargetSource();
			try {
				if (this.advised.exposeProxy) {
					// 王者: 是否有需要暴露这个代理对象,使用 ThreadLocal 让线程之间共享这个代理对象(异步事务)
					oldProxy = AopContext.setCurrentProxy(proxy);
					setProxyContext = true;
				}
				// Get as late as possible to minimize the time we "own" the target, in case it comes from a pool...
				target = targetSource.getTarget();
				Class<?> targetClass = (target != null ? target.getClass() : null);
				List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);
				Object retVal;

				if (chain.isEmpty() && Modifier.isPublic(method.getModifiers())) {
					Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);
					retVal = methodProxy.invoke(target, argsToUse);
				}
				else {
					// We need to create a method invocation...
					retVal = new CglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy).proceed();
				}
				retVal = processReturnType(proxy, target, method, retVal);
				return retVal;
			}
			finally {
				if (target != null && !targetSource.isStatic()) {
					targetSource.releaseTarget(target);
				}
				if (setProxyContext) {
					// Restore old proxy.
					AopContext.setCurrentProxy(oldProxy);
				}
			}
		}

getInterceptorsAndDynamicInterceptionAdvice() 方法会获取到所有的 Advisor这里假设就只有一个 @Async 注解那么会有 AsyncAnnotationAdvisor 增强器。

然后进入到 AsyncAnnotationAdvisor 中封装的 Advice也就是 AnnotationAsyncExecutionInterceptor 类核心源码如下

	@Override
	@Nullable
	public Object invoke(final MethodInvocation invocation) throws Throwable {
		System.out.println(invocation.getMethod().getName()+"------>async 拦截器被调用.....");
		Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
		
		Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
		final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);

		AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);

		Callable<Object> task = () -> {
			try {
				Object result = invocation.proceed();
				if (result instanceof Future) {
					return ((Future<?>) result).get();
				}
			}
			catch (ExecutionException ex) {
				handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
			}
			catch (Throwable ex) {
				handleError(ex, userDeclaredMethod, invocation.getArguments());
			}
			return null;
		};

		return doSubmit(task, executor, invocation.getMethod().getReturnType());
	}

通过 determineAsyncExecutor() 方法获取到线程执行器然后进入 doSubmit() 方法代码如下

	@Nullable
	protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
		if (CompletableFuture.class.isAssignableFrom(returnType)) {
			return CompletableFuture.supplyAsync(() -> {
				try {
					return task.call();
				}
				catch (Throwable ex) {
					throw new CompletionException(ex);
				}
			}, executor);
		}
		else if (ListenableFuture.class.isAssignableFrom(returnType)) {
			return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
		}
		else if (Future.class.isAssignableFrom(returnType)) {
			return executor.submit(task);
		}
		else {
			executor.submit(task);
			return null;
		}
	}

show() 方法返回值不属于 CompletableFutureListenableFutureFuture 所以直接就提交给线程池执行task 对象中通过 invocation.proceed() 调用到目标方法逻辑。所以 Spring 就是这样执行 @Async 流程的。

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: Spring