spring异步编程原理

一、背景

在很多场景中,业务操作完成后会完成一些收尾操作,并不希望实时等待其适时返回结果,甚至不关心执行成功与否,比如:

  • 下单完成后给用户发送短信
  • 流程审批完成后发送邮件通知

或者一些查询操作需要调用多个二方或者三方服务组装返回结果,并且这些调用之前没有依赖关系,比如某电商平台退货详情需要展示订单信息、商品信息、用户详细信息等.
这些场景都可以考虑使用异步编程,所谓异步编程,就是不使用业务主线程,利用线程池或者其他套件开启新的线程完成后续操作,针对不关心执行结果的场景直接使用新线程完成后续业务,主线程直接返回调用,对于关心执行结果的场景,调用后返回多线程句柄,等多线程执行完成后由业务主线程统一组装结果并返回.

二、spring异步编程介绍

spring3.1版本开始提供了开箱即用的异步编程套件,相关实现都放在spring-context模块,不需要引入其他额外的包,在配置类或者应用启动门面类上添加@EnableAsync即可开启异步化能力.
spring异步编程的实现依赖于Aop和动态代理,其具体实现此处不做赘述,加单描述一下spring异步编程用到的几个核心概念:

  • 切入点(Pointcut):用白话来说,spring要对哪些功能做增强处理,要么是表达式,要么是注解,他们所代表的地位置就是切入点,就本篇而言就是做异步化的位置.
  • 通知(Advice):对于满足切入点的程序做个性化增强处理的动作,spring异步编程中就是用线程池处理@EnableAsync注解的方法.
  • 增强器(Advisor): 切入点和通知一起组成了增强器,也就是知道了在哪切入,也知道怎么切入,还需要一个角色去做这件事.
  • 动态代理: 基于被代理的类,在程序启动时生成代理对象并将增强逻辑添加进去,常用的有jdk动态代理和cglib动态代理.

基于前边几个概念,spring异步即是在应用启动时扫描@Async注解的类或者方法,生成代理类,然后将多线程处理能力嵌入进去.

三、异步编程使用

1.开启异步能力

在应用启动类添加@EnableAsync注解:

@SpringBootApplication
@EnableAsync
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}
复制代码

2.添加异步注解

在需要实现异步化的方法上添加@Async注解:

@Slf4j
@Service
public class TestBuzz {
    @Async
    public void testAsync() {
        log.info("TestBuzz.testAsync thread={}",Thread.currentThread().getName());
    }
}
复制代码

3.模拟异步调用

@GetMapping("/test_async")
public IResp testAsync() {
    log.info("TestController.testAsync thread={}",Thread.currentThread().getName());
    //异步化调用
    this.testBuzz.testAsync();
    return IResp.getSuccessResult();
}
复制代码

启动并模拟请求:

curl http://localhost:8088/test_async
复制代码

image.png
就这么简单,我们通过两个注解就完成了异步编程.

四、源码&原理解析

从前两节的介绍中我们知道,spring利用Aop和动态代理在@Async标注的类生成代理并织入了多线程处理能力,那么接下来我们从源码层面分析一下其实现原理.
开启异步化能力时序图:
image.png
按照时序图从头到尾分析一下,并重点介绍其中涉及的几个类的实现.

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {

	Class<? extends Annotation> annotation() default Annotation.class;

	boolean proxyTargetClass() default false;

	AdviceMode mode() default AdviceMode.PROXY;

	int order() default Ordered.LOWEST_PRECEDENCE;
}
复制代码

annotation表示使用异步的注解,默认是@Async和EJB 3.1的@javax.ejb.Asynchronou,当然用户也可以提供自定义注解.
proxyTargetClass表示是否基于需要代理的类创建子类,仅在模式设置为AdviceMode.PROXY时适用,默认是false,需要注意的是将此属性设置为true将影响所有需要代理的Spring托管bean,而不仅仅是标记有@Async的bean。例如,其他标有Spring的@Transactional批注的bean将同时升级为子类代理.
mode表示使用哪种通知模式,默认是AdviceMode.PROXY,需要注意代理模式仅允许通过代理拦截调用,同一类中的本地调用无法以这种方式被拦截;在本地调用中,此类方法上的Async注释将被忽略,因为Spring的拦截器甚至不会在这种运行时场景中起作用.如果需要拦截本地调用或者其他更高级的拦截诉求,可以考虑考虑将其切换为AdviceMode.ASPECTJ.
order代表AsyncAnnotationBeanPostProcessor的顺序,默认值是最低,以便生成代理的时候最贴近代理目标.
最重要的是该注解导入了AsyncConfigurationSelector类,毫无疑问AsyncConfigurationSelector是开启异步能力配置的入口.

public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {

	private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
			"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
	@Override
	@Nullable
	public String[] selectImports(AdviceMode adviceMode) {
		switch (adviceMode) {
			case PROXY:
				return new String[] {ProxyAsyncConfiguration.class.getName()};
			case ASPECTJ:
				return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
			default:
				return null;
		}
	}
}
复制代码

AsyncConfigurationSelector继承自AdviceModeImportSelector,根据代理模式选择不同的配置,默认我们使用AdviceMode.PROXY,直接看ProxyAsyncConfiguration实现.

@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
	@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
		Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
		AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
		bpp.configure(this.executor, this.exceptionHandler);
		Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
		if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
			bpp.setAsyncAnnotationType(customAsyncAnnotation);
		}
		bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
		bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
		return bpp;
	}
}
复制代码

ProxyAsyncConfiguration继承自AbstractAsyncConfiguration,其将@EnableAsync注解属性解析出来备用,并将异步化配置注入进来.

@Autowired(required = false)
void setConfigurers(Collection<AsyncConfigurer> configurers) {
	if (CollectionUtils.isEmpty(configurers)) {
		return;
	}
	if (configurers.size() > 1) {
		throw new IllegalStateException("Only one AsyncConfigurer may exist");
	}
	AsyncConfigurer configurer = configurers.iterator().next();
	this.executor = configurer::getAsyncExecutor;
	this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler;
}
复制代码

这里可以看出,用户可以实现AsyncConfigurer接口来使用自定义线程池和异常处理器,回到AbstractAsyncConfiguration,创建了一个AsyncAnnotationBeanPostProcessor类型的bean并注入容器,并且把角色定义成基础设施,不向外提供服务,看一下AsyncAnnotationBeanPostProcessor继承关系:
AsyncAnnotationBeanPostProcessor.png
从继承关系来看,这个类有很多身份信息并且拥有很多能力,实现了BeanPostProcessor接口我们暂且将其定义成一个后置处理器,实现了AopInfrastructBean接口将不会被Aop处理,继承了ProxyProcessorSuppor又拥有了代理处理相关能力,实现了BeanFactoryAware拥有了bean管理能力,看一下其代码实现:

public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {
	public static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME =
			AnnotationAsyncExecutionInterceptor.DEFAULT_TASK_EXECUTOR_BEAN_NAME;
	@Nullable
	private Supplier<Executor> executor;
	@Nullable
	private Supplier<AsyncUncaughtExceptionHandler> exceptionHandler;
	@Nullable
	private Class<? extends Annotation> asyncAnnotationType;
	public AsyncAnnotationBeanPostProcessor() {
		setBeforeExistingAdvisors(true);
	}
	public void configure(
			@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
		this.executor = executor;
		this.exceptionHandler = exceptionHandler;
	}
	public void setExecutor(Executor executor) {
		this.executor = SingletonSupplier.of(executor);
	}
	public void setExceptionHandler(AsyncUncaughtExceptionHandler exceptionHandler) {
		this.exceptionHandler = SingletonSupplier.of(exceptionHandler);
	}
	public void setAsyncAnnotationType(Class<? extends Annotation> asyncAnnotationType) {
		Assert.notNull(asyncAnnotationType, "'asyncAnnotationType' must not be null");
		this.asyncAnnotationType = asyncAnnotationType;
	}
	@Override
	public void setBeanFactory(BeanFactory beanFactory) {
		super.setBeanFactory(beanFactory);
		AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
		if (this.asyncAnnotationType != null) {
			advisor.setAsyncAnnotationType(this.asyncAnnotationType);
		}
		advisor.setBeanFactory(beanFactory);
		this.advisor = advisor;
	}
}
复制代码

spring管理的bean初始化过程执行顺序BeanFactoryAware是在后置处理器BeanPostProcessor之前,我们先分析setBeanFactory方法,该方法调用父类实现先把BeanFactory注入进来,然后创建了一个增强器AsyncAnnotationAdvisor(给后置处理器postProcessAfterInitialization方法备用),看一下继承关系:
AsyncAnnotationAdvisor.png
接着看AsyncAnnotationAdvisor构造器:

public AsyncAnnotationAdvisor(
		@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

	Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
	asyncAnnotationTypes.add(Async.class);
	try {
		asyncAnnotationTypes.add((Class<? extends Annotation>)
				ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
	}
	catch (ClassNotFoundException ex) {
		// If EJB 3.1 API not present, simply ignore.
	}
	this.advice = buildAdvice(executor, exceptionHandler);
	this.pointcut = buildPointcut(asyncAnnotationTypes);
}
复制代码

如同我们前边所说,增强器由advice和pointcut组成,这里分别构建了通知和切入点,先看构造通知:

protected Advice buildAdvice(
		@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
	AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
	interceptor.configure(executor, exceptionHandler);
	return interceptor;
}
复制代码

构建通知用的是AnnotationAsyncExecutionInterceptor,看一下继承关系:
AnnotationAsyncExecutionInterceptor.png
本质上是一个MethodInterceptor,执行拦截操作的时候调用invoke方法:

public Object invoke(final MethodInvocation invocation) throws Throwable {
	Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
	Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
	final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);

	AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
	if (executor == null) {
		throw new IllegalStateException(
				"No executor specified and no default executor set on AsyncExecutionInterceptor either");
	}
	Callable<Object> task = () -> {
		try {
			Object result = invocation.proceed();
			if (result instanceof Future) {
				return ((Future<?>) result).get();
			}
		}
		catch (ExecutionException ex) {
			handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
		}
		catch (Throwable ex) {
			handleError(ex, userDeclaredMethod, invocation.getArguments());
		}
		return null;
	};
	return doSubmit(task, executor, invocation.getMethod().getReturnType());
}
复制代码

该方法先获取AsyncTaskExecutor异步任务执行器,简单理解为线程池,然后在线程池中执行异步逻辑,继续看determineAsyncExecutor获取线程池:

protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
AsyncTaskExecutor executor = this.executors.get(method);
if (executor == null) {
	Executor targetExecutor;
	String qualifier = getExecutorQualifier(method);
	if (StringUtils.hasLength(qualifier)) {
		targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
	}
	else {
		targetExecutor = this.defaultExecutor.get();
	}
	if (targetExecutor == null) {
		return null;
	}
	executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
			(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
	this.executors.put(method, executor);
}
return executor;
}
复制代码

先从缓存中获取,如果获取到直接返回,否则如果@Async注解有指定线程池就根据名字获取,否则获取默认线程池.
接着看线程池提交异步操作doSubmit:

protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
	if (CompletableFuture.class.isAssignableFrom(returnType)) {
		return CompletableFuture.supplyAsync(() -> {
			try {
				return task.call();
			}
			catch (Throwable ex) {
				throw new CompletionException(ex);
			}
		}, executor);
	}
	else if (ListenableFuture.class.isAssignableFrom(returnType)) {
		return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
	}
	else if (Future.class.isAssignableFrom(returnType)) {
		return executor.submit(task);
	}
	else {
		executor.submit(task);
		return null;
	}
}
复制代码

可以看出支持异步方法返回结果为CompletableFuture、ListenableFuture和Future的有返回值的操作,其他返回类型或者返回类型为void都当做无返回值异步提交.
回到前边构造切入点操作:

protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
	ComposablePointcut result = null;
	for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
		Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
		Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
		if (result == null) {
			result = new ComposablePointcut(cpc);
		}
		else {
			result.union(cpc);
		}
		result = result.union(mpc);
	}
	return (result != null ? result : Pointcut.TRUE);
}
复制代码

方法中构造了两个AnnotationMatchingPointcut,一个匹配方法切入点,另一个是匹配类切入点,然后做了union操作构造了一个ComposablePointcut混合切入点,只要满足类或者方法上带有@Async注解都符合切入规则,这个切入点在AsyncAnnotationBeanPostProcessor后置处理器构造代理类会用到.

前边分析了setBeanFactory构造增强器的操作,我们继续分析后置处理器的postProcessAfterInitialization操作,先看代码实现:

public Object postProcessAfterInitialization(Object bean, String beanName) {
	if (this.advisor == null || bean instanceof AopInfrastructureBean) {
		// Ignore AOP infrastructure such as scoped proxies.
		return bean;
	}
	if (bean instanceof Advised) {
		Advised advised = (Advised) bean;
		if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
			// Add our local Advisor to the existing proxy's Advisor chain...
			if (this.beforeExistingAdvisors) {
				advised.addAdvisor(0, this.advisor);
			}
			else {
				advised.addAdvisor(this.advisor);
			}
			return bean;
		}
	}
	if (isEligible(bean, beanName)) {
		ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
		if (!proxyFactory.isProxyTargetClass()) {
			evaluateProxyInterfaces(bean.getClass(), proxyFactory);
		}
		proxyFactory.addAdvisor(this.advisor);
		customizeProxyFactory(proxyFactory);
		return proxyFactory.getProxy(getProxyClassLoader());
	}
	// No proxy needed.
	return bean;
}
复制代码

如果增强器为null或者目标bean是AopInfrastructureBean基础组件类型直接放过,如果bean是待通知对象切满足该Advisor的通知条件,直接将该增强器添加到待通知对象的增强器列表中,否则如果目标bean满足该增强器的切入条件,利用动态代理生成代理类并将该Advisor添加到其增强器列表返回.
这段代码是动态代理生成代理类并织入通知逻辑的核心点,我们主要分析isEligible和生成代理的逻辑,先分析是否满足切入逻辑的方法isEligible:

protected boolean isEligible(Class<?> targetClass) {
	Boolean eligible = this.eligibleBeans.get(targetClass);
	if (eligible != null) {
		return eligible;
	}
	if (this.advisor == null) {
		return false;
	}
	eligible = AopUtils.canApply(this.advisor, targetClass);
	this.eligibleBeans.put(targetClass, eligible);
	return eligible;
}
复制代码

先从缓存中获取改bean是否有被增强的资格,如果已被缓存直接返回缓存结果,否则如果增强器为null,则返回无资格,最后调用AopUtils.canApply检查目标类是否满足Advisor切入的规则,继续看AopUtils.canApply实现:

public static boolean canApply(Advisor advisor, Class<?> targetClass, boolean hasIntroductions) {
	if (advisor instanceof IntroductionAdvisor) {
		return ((IntroductionAdvisor) advisor).getClassFilter().matches(targetClass);
	}
	else if (advisor instanceof PointcutAdvisor) {
		PointcutAdvisor pca = (PointcutAdvisor) advisor;
		return canApply(pca.getPointcut(), targetClass, hasIntroductions);
	}
	else {
		// It doesn't have a pointcut so we assume it applies.
		return true;
	}
}
复制代码

根据Advisor的类型检查目标类是否满足切入资格,和明显前边AsyncAnnotationBeanPostProcessor构造的是PointcutAdvisor类型的增强器,继续看canApply实现:

public static boolean canApply(Pointcut pc, Class<?> targetClass, boolean hasIntroductions) {
	Assert.notNull(pc, "Pointcut must not be null");
	if (!pc.getClassFilter().matches(targetClass)) {
		return false;
	}
	MethodMatcher methodMatcher = pc.getMethodMatcher();
	if (methodMatcher == MethodMatcher.TRUE) {
		// No need to iterate the methods if we're matching any method anyway...
		return true;
	}
	IntroductionAwareMethodMatcher introductionAwareMethodMatcher = null;
	if (methodMatcher instanceof IntroductionAwareMethodMatcher) {
		introductionAwareMethodMatcher = (IntroductionAwareMethodMatcher) methodMatcher;
	}
	Set<Class<?>> classes = new LinkedHashSet<>();
	if (!Proxy.isProxyClass(targetClass)) {
		classes.add(ClassUtils.getUserClass(targetClass));
	}
	classes.addAll(ClassUtils.getAllInterfacesForClassAsSet(targetClass));
	for (Class<?> clazz : classes) {
		Method[] methods = ReflectionUtils.getAllDeclaredMethods(clazz);
		for (Method method : methods) {
			if (introductionAwareMethodMatcher != null ?
					introductionAwareMethodMatcher.matches(method, targetClass, hasIntroductions) :
					methodMatcher.matches(method, targetClass)) {
				return true;
			}
		}
	}
	return false;
}
复制代码

其实说的直白点,就是检查目标类上或者方法上是否有@Async注解,如果有就返回满足切入规则,否则返回不符合切入规则.
回到前边后置处理器postProcessAfterInitialization方法,如果目标bean满足切入规则,则使用代理工厂ProxyFactory生成代理对象并返回:

if (isEligible(bean, beanName)) {
	ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
	if (!proxyFactory.isProxyTargetClass()) {
		evaluateProxyInterfaces(bean.getClass(), proxyFactory);
	}
	proxyFactory.addAdvisor(this.advisor);
	customizeProxyFactory(proxyFactory);
	return proxyFactory.getProxy(getProxyClassLoader());
}
复制代码

先生成代理工厂,然后检查给定bean类上的接口,并将它们应用于ProxyFactory(如果适用,否则退化成直接代理目标类),将增强器添加到代理工厂中,最后由代理工厂生成代理对象,接着看生成代理类的实现:

public AopProxy createAopProxy(AdvisedSupport config) throws AopConfigException {
	if (config.isOptimize() || config.isProxyTargetClass() || hasNoUserSuppliedProxyInterfaces(config)) {
		Class<?> targetClass = config.getTargetClass();
		if (targetClass == null) {
			throw new AopConfigException("TargetSource cannot determine target class: " +
					"Either an interface or a target is required for proxy creation.");
		}
		if (targetClass.isInterface() || Proxy.isProxyClass(targetClass)) {
			return new JdkDynamicAopProxy(config);
		}
		return new ObjenesisCglibAopProxy(config);
	}
	else {
		return new JdkDynamicAopProxy(config);
	}
}
复制代码

先创建Aop代理,如果目标类是接口或者目标类是代理类,使用jdk动态代理,否则使用cglib动态代理,两种代理这里不展开细讲,简单分析一下其构造代理类的原理,先看JdkDynamicAopProxy:

public Object getProxy(@Nullable ClassLoader classLoader) {
	if (logger.isTraceEnabled()) {
		logger.trace("Creating JDK dynamic proxy: " + this.advised.getTargetSource());
	}
	Class<?>[] proxiedInterfaces = AopProxyUtils.completeProxiedInterfaces(this.advised, true);
	findDefinedEqualsAndHashCodeMethods(proxiedInterfaces);
	return Proxy.newProxyInstance(classLoader, proxiedInterfaces, this);
}
复制代码

到这里我们看到了熟悉的jdk动态代理实现Proxy.newProxyInstance,寻找需要代理的接口,然后生成接口的动态代理对象,这里需要注意一下,JdkDynamicAopProxy实现了InvocationHandler接口,JDK动态代理会在内存中生成一个类名为Proxy0形式的代理类,调用Proxy0形式的代理类,调用Proxy0方法,内部调用父类Proxy.InvocationHandler.invoke方法,也就是JdkDynamicAopProxy实现InvocationHandler接口的invoke方法:

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
	MethodInvocation invocation;
	Object oldProxy = null;
	boolean setProxyContext = false;
	TargetSource targetSource = this.advised.targetSource;
	Object target = null;
	try {
		if (!this.equalsDefined && AopUtils.isEqualsMethod(method)) {
			return equals(args[0]);
		}
		else if (!this.hashCodeDefined && AopUtils.isHashCodeMethod(method)) {
			return hashCode();
		}
		else if (method.getDeclaringClass() == DecoratingProxy.class) {
			return AopProxyUtils.ultimateTargetClass(this.advised);
		}
		else if (!this.advised.opaque && method.getDeclaringClass().isInterface() &&
				method.getDeclaringClass().isAssignableFrom(Advised.class)) {
			return AopUtils.invokeJoinpointUsingReflection(this.advised, method, args);
		}
		Object retVal;
		if (this.advised.exposeProxy) {
			oldProxy = AopContext.setCurrentProxy(proxy);
			setProxyContext = true;
		}
		target = targetSource.getTarget();
		Class<?> targetClass = (target != null ? target.getClass() : null);
		List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);
		if (chain.isEmpty()) {
			Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);
			retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse);
		}
		else {
			invocation = new ReflectiveMethodInvocation(proxy, target, method, args, targetClass, chain);
			retVal = invocation.proceed();
		}
		Class<?> returnType = method.getReturnType();
		if (retVal != null && retVal == target &&
				returnType != Object.class && returnType.isInstance(proxy) &&
				!RawTargetAccess.class.isAssignableFrom(method.getDeclaringClass())) {
			retVal = proxy;
		}
		else if (retVal == null && returnType != Void.TYPE && returnType.isPrimitive()) {
			throw new AopInvocationException(
					"Null return value from advice does not match primitive return type for: " + method);
		}
		return retVal;
	}
	finally {
		if (target != null && !targetSource.isStatic()) {
			targetSource.releaseTarget(target);
		}
		if (setProxyContext) {
			AopContext.setCurrentProxy(oldProxy);
		}
	}
}

复制代码

先取出被织入的拦截逻辑,本篇中就是AnnotationAsyncExecutionInterceptor,然后指定方法调用,也就是代理类的调用,本质上就是先调用增强逻辑和最原始被代理类的方法的调用.
然后我们再看一下cglib动态代理实现CglibAopProxy:

public Object getProxy(@Nullable ClassLoader classLoader) {
	try {
		Class<?> rootClass = this.advised.getTargetClass();
		Assert.state(rootClass != null, "Target class must be available for creating a CGLIB proxy");
		Class<?> proxySuperClass = rootClass;
		if (ClassUtils.isCglibProxyClass(rootClass)) {
			proxySuperClass = rootClass.getSuperclass();
			Class<?>[] additionalInterfaces = rootClass.getInterfaces();
			for (Class<?> additionalInterface : additionalInterfaces) {
				this.advised.addInterface(additionalInterface);
			}
		}
		validateClassIfNecessary(proxySuperClass, classLoader);
		Enhancer enhancer = createEnhancer();
		if (classLoader != null) {
			enhancer.setClassLoader(classLoader);
			if (classLoader instanceof SmartClassLoader &&
					((SmartClassLoader) classLoader).isClassReloadable(proxySuperClass)) {
				enhancer.setUseCache(false);
			}
		}
		enhancer.setSuperclass(proxySuperClass);
		enhancer.setInterfaces(AopProxyUtils.completeProxiedInterfaces(this.advised));
		enhancer.setNamingPolicy(SpringNamingPolicy.INSTANCE);
		enhancer.setStrategy(new ClassLoaderAwareUndeclaredThrowableStrategy(classLoader));
		Callback[] callbacks = getCallbacks(rootClass);
		Class<?>[] types = new Class<?>[callbacks.length];
		for (int x = 0; x < types.length; x++) {
			types[x] = callbacks[x].getClass();
		}
		enhancer.setCallbackFilter(new ProxyCallbackFilter(
				this.advised.getConfigurationOnlyCopy(), this.fixedInterceptorMap, this.fixedInterceptorOffset));
		enhancer.setCallbackTypes(types);
		return createProxyClassAndInstance(enhancer, callbacks);
	}
	catch (CodeGenerationException | IllegalArgumentException ex) {
		throw new AopConfigException("Could not generate CGLIB subclass of " + this.advised.getTargetClass() +
				": Common causes of this problem include using a final class or a non-visible class",
				ex);
	}
	catch (Throwable ex) {
		// TargetSource.getTarget() failed
		throw new AopConfigException("Unexpected AOP exception", ex);
	}
}
复制代码

我们也看到了熟悉的cglib动态代理实现Enhancer,CGLB动态代理会在内存生成一个类名为?EnhancerByCGLIB?b3361405形式的代理类,调用xxx?EnhancerByCGLIB?b3361405代理类方法,内部调用MethodInterceptor.intercept(),看一下getCallbacks方法,也即是将被代理类的拦截调用装配成MethodInterceptor的逻辑:

private Callback[] getCallbacks(Class<?> rootClass) throws Exception {
	boolean exposeProxy = this.advised.isExposeProxy();
	boolean isFrozen = this.advised.isFrozen();
	boolean isStatic = this.advised.getTargetSource().isStatic();
	Callback aopInterceptor = new DynamicAdvisedInterceptor(this.advised);
	Callback targetInterceptor;
	if (exposeProxy) {
		targetInterceptor = (isStatic ?
				new StaticUnadvisedExposedInterceptor(this.advised.getTargetSource().getTarget()) :
				new DynamicUnadvisedExposedInterceptor(this.advised.getTargetSource()));
	}
	else {
		targetInterceptor = (isStatic ?
				new StaticUnadvisedInterceptor(this.advised.getTargetSource().getTarget()) :
				new DynamicUnadvisedInterceptor(this.advised.getTargetSource()));
	}
	Callback targetDispatcher = (isStatic ?
			new StaticDispatcher(this.advised.getTargetSource().getTarget()) : new SerializableNoOp());
	Callback[] mainCallbacks = new Callback[] {
			aopInterceptor,  // for normal advice
			targetInterceptor,  // invoke target without considering advice, if optimized
			new SerializableNoOp(),  // no override for methods mapped to this
			targetDispatcher, this.advisedDispatcher,
			new EqualsInterceptor(this.advised),
			new HashCodeInterceptor(this.advised)
	};
	Callback[] callbacks;
	if (isStatic && isFrozen) {
		Method[] methods = rootClass.getMethods();
		Callback[] fixedCallbacks = new Callback[methods.length];
		this.fixedInterceptorMap = new HashMap<>(methods.length);

		// TODO: small memory optimization here (can skip creation for methods with no advice)
		for (int x = 0; x < methods.length; x++) {
			List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(methods[x], rootClass);
			fixedCallbacks[x] = new FixedChainStaticTargetInterceptor(
					chain, this.advised.getTargetSource().getTarget(), this.advised.getTargetClass());
			this.fixedInterceptorMap.put(methods[x].toString(), x);
		}
		callbacks = new Callback[mainCallbacks.length + fixedCallbacks.length];
		System.arraycopy(mainCallbacks, 0, callbacks, 0, mainCallbacks.length);
		System.arraycopy(fixedCallbacks, 0, callbacks, mainCallbacks.length, fixedCallbacks.length);
		this.fixedInterceptorOffset = mainCallbacks.length;
	}
	else {
		callbacks = mainCallbacks;
	}
	return callbacks;
}
复制代码

在此篇幅异步编程场景,调用代理类方法会直接调用到DynamicAdvisedInterceptor的intercept:

public Object intercept(Object proxy, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {
	Object oldProxy = null;
	boolean setProxyContext = false;
	Object target = null;
	TargetSource targetSource = this.advised.getTargetSource();
	try {
		if (this.advised.exposeProxy) {
			// Make invocation available if necessary.
			oldProxy = AopContext.setCurrentProxy(proxy);
			setProxyContext = true;
		}
		target = targetSource.getTarget();
		Class<?> targetClass = (target != null ? target.getClass() : null);
		List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);
		Object retVal;
		if (chain.isEmpty() && Modifier.isPublic(method.getModifiers())) {
			Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);
			retVal = methodProxy.invoke(target, argsToUse);
		}
		else {
			// We need to create a method invocation...
			retVal = new CglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy).proceed();
		}
		retVal = processReturnType(proxy, target, method, retVal);
		return retVal;
	}
	finally {
		if (target != null && !targetSource.isStatic()) {
			targetSource.releaseTarget(target);
		}
		if (setProxyContext) {
			// Restore old proxy.
			AopContext.setCurrentProxy(oldProxy);
		}
	}
}
复制代码

先获取代理类对应方法的拦截器链,如果没有拦截器链且方法是public类型,直接调用代理方法返回,否则将方法连同拦截器链构造成CglibMethodInvocation并执行.
在JdkDynamicAopProxy和CglibAopProxy生成的代理类执行的过程成都会调用到前边所说的AnnotationAsyncExecutionInterceptor类的invoke方法,也即是异步执行的逻辑.
jdk动态代理异步执行时序图:
image.png
Cglib代理异步执行时序图:
image.png

五、总结

从本篇第三节异步编程使用方式来看,spring异步编程接入特别简单,但是从第四节的原理和源码解析来看,其实现也挺复杂的,这就是spring的强大之处,把困难留给自己,把便利留给使用者,把一些复杂的实现对用户做到透明化.
从spring异步编程的源码来看,其使用了很多技术和功能点:

  • 导入配置:AsyncConfigurationSelector
  • 后置处理器:AsyncAnnotationBeanPostProcessor
  • Aop编程:AsyncAnnotationAdvisor
  • 线程池:AsyncTaskExecutor
  • 拦截器: AnnotationAsyncExecutionInterceptor
  • 切入点: ComposablePointcut/AnnotationMatchingPointcut
  • 工厂模式: BeanFactory和ProxyFactory
  • 动态代理: JdkDynamicAopProxy和CglibAopProxy
  • 代理类调用委托处理: jdk动态代理委托给JdkDynamicAopProxy.invoke,cglib动态代理类委托给DynamicAdvisedInterceptor.intercept

由于篇幅问题,中间还有很多细节没覆盖到,比如说获取线程池的逻辑设计也比较巧妙,感兴趣的也可以深入研究一下:

protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
	if (beanFactory != null) {
		try {
			return beanFactory.getBean(TaskExecutor.class);
		}
		catch (NoUniqueBeanDefinitionException ex) {
			logger.debug("Could not find unique TaskExecutor bean", ex);
			try {
				return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
			}
			catch (NoSuchBeanDefinitionException ex2) {
			}
		}
		catch (NoSuchBeanDefinitionException ex) {
			logger.debug("Could not find default TaskExecutor bean", ex);
			try {
				return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
			}
			catch (NoSuchBeanDefinitionException ex2) {
			}
		}
	}
	return null;
}
复制代码
spring异步的使用主要记住两个点,2个注解和一个返回值,在启动类或者配置使用@EnableAsync开启异步,在需要异步调用的方法上添加@Async注解,异步支持的返回类型有CompletableFuture、ListenableFuture和Future和void.
复制代码
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享