Fegin的底层调用链~

什么是Fegin

Feign是一个声明式WebService客户端。使用Feign能让编写Web Service客户端更加简单, 它的使用方法是定义一个接口,然后在上面添加注解,同时也支持JAX-RS标准的注解。Feign也支持可拔插式的编码器和解码器。Spring Cloud对Feign进行了封装,使其支持了Spring MVC标准注解和HttpMessageConverters。Feign可以与Eureka和Ribbon组合使用以支持负载均衡。

本文主要讲述了SpringCloud中Fegin的具体调用链路,以及底层实现。

Feign的初始化

@EnableFeignClients 注解

如果想要fegin生效就需要添加@EnableFeignClients注解,首先看一下EnableFeignClients:

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Import(FeignClientsRegistrar.class)
public @interface EnableFeignClients {
    //可配置省略
    ...
}
复制代码

看到了熟悉的注解@Import,查看一下FeignClientsRegistrar类

FeignClientsRegistrar.java 注册beanDefinition

主要的方法入口:

FeignClientsRegistrar#registerBeanDefinitions

@Override
public void registerBeanDefinitions(AnnotationMetadata metadata,
                BeanDefinitionRegistry registry) {
    //通过EnableFeginClients的配置构建Configuration
    registerDefaultConfiguration(metadata, registry);
    //注册FeignClient
    registerFeignClients(metadata, registry);
}
复制代码

FeignClientsRegistrar#registerDefaultConfiguration

private void registerDefaultConfiguration(AnnotationMetadata metadata,
			BeanDefinitionRegistry registry) {
    //获取注解配置                
    Map<String, Object> defaultAttrs = metadata
				.getAnnotationAttributes(EnableFeignClients.class.getName(), true);
                               
    if (defaultAttrs != null && defaultAttrs.containsKey("defaultConfiguration")) {
        String name;
        //获取是否有外部类
        if (metadata.hasEnclosingClass()) {
            name = "default." + metadata.getEnclosingClassName();
        }
        else {
            name = "default." + metadata.getClassName();
        }
        //注册Configuration对象
        registerClientConfiguration(registry, name,
                        defaultAttrs.get("defaultConfiguration"));
    }
}
复制代码

FeignClientsRegistrar#registerFeignClients

主要看怎么注册FeignClient的

public void registerFeignClients(AnnotationMetadata metadata,
			BeanDefinitionRegistry registry) {
    //scanner见名知意,获取一个扫描器
    ClassPathScanningCandidateComponentProvider scanner = getScanner();
    //设置资源加载器
    scanner.setResourceLoader(this.resourceLoader);
    
    //要扫描的包
    Set<String> basePackages;
    //注解的配置
    Map<String, Object> attrs = metadata
                    .getAnnotationAttributes(EnableFeignClients.class.getName());
    //注解类型过滤器,匹配设置的类型
    AnnotationTypeFilter annotationTypeFilter = new AnnotationTypeFilter(
                    FeignClient.class);
    final Class<?>[] clients = attrs == null ? null
                    : (Class<?>[]) attrs.get("clients");
    //没配置clients属性,设置过滤器,以及扫包路径 
    if (clients == null || clients.length == 0) {
        scanner.addIncludeFilter(annotationTypeFilter);
        basePackages = getBasePackages(metadata);
    }
    //配置了clients属性
    else {
        ...
    }
    //包路径
    for (String basePackage : basePackages) {
        //扫描所有的符合过滤器的BeanDefinition对象
        Set<BeanDefinition> candidateComponents = scanner
                        .findCandidateComponents(basePackage);
        for (BeanDefinition candidateComponent : candidateComponents) {
            //是否属于注解bean定义
            if (candidateComponent instanceof AnnotatedBeanDefinition) {
                // 验证是一个接口类型
                AnnotatedBeanDefinition beanDefinition = (AnnotatedBeanDefinition) candidateComponent;
                AnnotationMetadata annotationMetadata = beanDefinition.getMetadata();
                Assert.isTrue(annotationMetadata.isInterface(),
                                "@FeignClient can only be specified on an interface");
                //获取@FeignClient的配置
                Map<String, Object> attributes = annotationMetadata
                                .getAnnotationAttributes(
                                                FeignClient.class.getCanonicalName());

                String name = getClientName(attributes);
                //对单独FeignClient的configuration注册
                registerClientConfiguration(registry, name,
                                attributes.get("configuration"));
                //注册FeiginClien,其实是注册的FactoryBean
                registerFeignClient(registry, annotationMetadata, attributes);
            }
        }
    }
}
复制代码
FeignClientsRegistrar#registerFeignClient

这里是注册FeignClientFactoryBean到IOC容器中的步骤,可看到如下如下解析了FeignClient的配置设置到BeanDefinitionBuilder中,然后生成AbstractBeanDefinition,最后生成BeanDefinitionHolder,进行BeanDefinition的注册。BeanDefinitionHolder实则是一个钩子类,用来记录类名对应的BeanDefinition等信息

private void registerFeignClient(BeanDefinitionRegistry registry,
			AnnotationMetadata annotationMetadata, Map<String, Object> attributes) {
    String className = annotationMetadata.getClassName();
    BeanDefinitionBuilder definition = BeanDefinitionBuilder
                    .genericBeanDefinition(FeignClientFactoryBean.class);
    validate(attributes);
    definition.addPropertyValue("url", getUrl(attributes));
    definition.addPropertyValue("path", getPath(attributes));
    String name = getName(attributes);
    definition.addPropertyValue("name", name);
    definition.addPropertyValue("type", className);
    definition.addPropertyValue("decode404", attributes.get("decode404"));
    definition.addPropertyValue("fallback", attributes.get("fallback"));
    definition.addPropertyValue("fallbackFactory", attributes.get("fallbackFactory"));
    definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);

    String alias = name + "FeignClient";
    AbstractBeanDefinition beanDefinition = definition.getBeanDefinition();

    boolean primary = (Boolean)attributes.get("primary"); // has a default, won't be null

    beanDefinition.setPrimary(primary);

    String qualifier = getQualifier(attributes);
    if (StringUtils.hasText(qualifier)) {
            alias = qualifier;
    }

    BeanDefinitionHolder holder = new BeanDefinitionHolder(beanDefinition, className,
                    new String[] { alias });
    BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry);
}
复制代码

到这里EnableFeignClients注解做的事情就完成,简单的说就是扫描所有包,然后把所有@FeignClient的类进行FactoryBean的注册,然后在创建bean的时候进行getObject方法获得对应的类。

FeignClientFactoryBean.java 生成bean对象

接下来看FeignClientFactoryBean的getObject方法,调用了getTarget方法

FeignClientFactoryBean#getObject

@Override
public Object getObject() throws Exception {
    return getTarget();
}
复制代码

FeignClientFactoryBean#getTarget

进入到getTarget方法,只看未配置URL的情况

<T> T getTarget() {
    //获取feign上下文
    FeignContext context = applicationContext.getBean(FeignContext.class);
    //创建Builder对象,主要包含了一些协议操作,以及拦截器
    Feign.Builder builder = feign(context);
    //如果FeignClient没有配置指定url则走这边
    if (!StringUtils.hasText(this.url)) {
        String url;
        //如果没有配置,且name不是http开头 则拼上协议
        if (!this.name.startsWith("http")) {
            url = "http://" + this.name;
        }
        else {
            url = this.name;
        }
        url += cleanPath();
        //这边是负载均衡之后的对象
        return (T) loadBalance(builder, context, new HardCodedTarget<>(this.type,
                        this.name, url));
    }
    ...
}
复制代码

FeignClientFactoryBean#loadBalance

接下来看loadBalance方法做了什么事

protected <T> T loadBalance(Feign.Builder builder, FeignContext context,
			HardCodedTarget<T> target) {
    //这边得到的是一个LoadBalanceFeignClient对象                    
    Client client = getOptional(context, Client.class);
    if (client != null) {
        //包装client
        builder.client(client);
        Targeter targeter = get(context, Targeter.class);
        //主要看这个方法
        return targeter.target(this, builder, context, target);
    }
}
复制代码

HystrixTargeter#target

看到target方法,主要是feign.target方法

public <T> T target(FeignClientFactoryBean factory, Feign.Builder feign, FeignContext context,
						Target.HardCodedTarget<T> target) {
    if (!(feign instanceof feign.hystrix.HystrixFeign.Builder)) {
        return feign.target(target);
    }	
    ...
}
复制代码

feign.target可以看到实例化了对象,所以重点看build()干了什么

public <T> T target(Target<T> target) {
  return build().newInstance(target);
}
复制代码

Feign.build主要是初始化SynchronousMethodHandler.Factory以及生成了ReflectiveFeign对象,进入到ReflectiveFeign的newInstance方法。

public Feign build() {
  SynchronousMethodHandler.Factory synchronousMethodHandlerFactory =
      new SynchronousMethodHandler.Factory(client, retryer, requestInterceptors, logger,
                                           logLevel, decode404, closeAfterDecode);
  ParseHandlersByName handlersByName =
      new ParseHandlersByName(contract, options, encoder, decoder, queryMapEncoder,
                              errorDecoder, synchronousMethodHandlerFactory);
  return new ReflectiveFeign(handlersByName, invocationHandlerFactory, queryMapEncoder);
}
复制代码

ReflectiveFeign#newInstance

public <T> T newInstance(Target<T> target) {
    //方法-对应的增强处理 具体看下图
    Map<String, MethodHandler> nameToHandler = targetToHandlersByName.apply(target);
    Map<Method, MethodHandler> methodToHandler = new LinkedHashMap<Method, MethodHandler>();
    List<DefaultMethodHandler> defaultMethodHandlers = new LinkedList<DefaultMethodHandler>();
    //记录
    for (Method method : target.type().getMethods()) {
      ...
    }
    //生成拦截器 很熟悉 JDK动态代理必备!
    InvocationHandler handler = factory.create(target, methodToHandler);
    //生成jdk代理对象
    T proxy = (T) Proxy.newProxyInstance(target.type().getClassLoader(), new Class<?>[]{target.type()}, handler);

    for(DefaultMethodHandler defaultMethodHandler : defaultMethodHandlers) {
      defaultMethodHandler.bindTo(proxy);
    }
    return proxy;
  }
复制代码

如下图,nameToHandler的存储
image.png
在看生成的invocationHandler,接下来只需要关心invocationHandler中如何处理的就行。
image.png

Fegin的调用

ReflectiveFeign#FeignInvocationHandler#invoke方法

此处的dispatch其实是刚刚传进来的methodToHandler,而这methodToHandler记录了method以及对应的处理类,也就是SynchronousMethodHandler

 @Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  ...
  return dispatch.get(method).invoke(args);
}
复制代码

所以接下来进入SynchronousMethodHandler的invoke方法

SynchronousMethodHandler#invoke

可以看到这边用while做重试,然后看到try catch,正常业务代码都是写在try中,所以直接进入executeAndDecode方法

 public Object invoke(Object[] argv) throws Throwable {
    RequestTemplate template = buildTemplateFromArgs.create(argv);
    Retryer retryer = this.retryer.clone();
    while (true) {
      try {
        return executeAndDecode(template);
      } catch (RetryableException e) {
        retryer.continueOrPropagate(e);
        if (logLevel != Logger.Level.NONE) {
          logger.logRetry(metadata.configKey(), logLevel);
        }
        continue;
      }
    }
  }
复制代码

SynchronousMethodHandler#executeAndDecode

直接看重点try中的execute方法,这里的client是LoadBalanceFeignClient,所以进入LoadBalanceFeignClient

 Object executeAndDecode(RequestTemplate template) throws Throwable {
    Request request = targetRequest(template);
    ...
    Response response;
    long start = System.nanoTime();
    try {
      response = client.execute(request, options);
      // ensure the request is set. TODO: remove in Feign 10
      response.toBuilder().request(request).build();
    } catch (IOException e) {
      ...
    }
    ...
  }
复制代码

LoadBalancerFeignClient#execute

这边可以看到熟悉的lbClient以及ribbon相关的,就是在做负载均衡

@Override
public Response execute(Request request, Request.Options options) throws IOException {
    try {
        URI asUri = URI.create(request.url());
        String clientName = asUri.getHost();
        URI uriWithoutHost = cleanUrl(request.url(), clientName);
        FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
                        this.delegate, request, uriWithoutHost);

        IClientConfig requestConfig = getClientConfig(options, clientName);
        return lbClient(clientName).executeWithLoadBalancer(ribbonRequest,
                        requestConfig).toResponse();
    }
    catch (ClientException e) {
        ...
    }
}
复制代码

接下来看AbstractLoadBalancerAwareClient的executeWithLoadBalancer方法

AbstractLoadBalancerAwareClient#executeWithLoadBalancer

LoadBalancerCommand的submit方法传入了一个ServerOperation对象,直接进入submit方法

 public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
    LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);

    try {
        return command.submit(
            new ServerOperation<T>() {
                @Override
                public Observable<T> call(Server server) {
                    URI finalUri = reconstructURIWithServer(server, request.getUri());
                    S requestForServer = (S) request.replaceUri(finalUri);
                    try {
                        return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                    } 
                    catch (Exception e) {
                        return Observable.error(e);
                    }
                }
            })
            .toBlocking()
            .single();
    } catch (Exception e) {
       ...
    }
复制代码

LoadBalancerCommand#submit

我们只需要关注关键步骤即可

final ExecutionInfoContext context = new ExecutionInfoContext();
    ...

    // Use the load balancer
    Observable<T> o = 
            (server == null ? selectServer() : Observable.just(server))
            ...
        return operation.call(server).doOnEach(new Observer<T>() {
            private T entity;
            @Override
            public void onCompleted() {
                recordStats(tracer, stats, entity, null);
                // TODO: What to do if onNext or onError are never called?
            }

            @Override
            public void onError(Throwable e) {
                recordStats(tracer, stats, null, e);
                logger.debug("Got error {} when executed on server {}", e, server);
                if (listenerInvoker != null) {
                    listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
                }
            }

            @Override
            public void onNext(T entity) {
                this.entity = entity;
                if (listenerInvoker != null) {
                    listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
                }
            }                            

            private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) {
                tracer.stop();
                loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler);
            }
        });
    }
});

    ...
    return ...
}
复制代码

1.先看如何选择server的,如下代码,进入selectServer()方法

server == null ? selectServer() : Observable.just(server)
复制代码

LoadBalancerCommand#selectServer

直接定位获取server的方法getServerFromLoadBalancer

 private Observable<Server> selectServer() {
    return Observable.create(new OnSubscribe<Server>() {
        @Override
        public void call(Subscriber<? super Server> next) {
            try {
                Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);   
                next.onNext(server);
                next.onCompleted();
            } catch (Exception e) {
                next.onError(e);
            }
        }
    });
}
复制代码

LoadBalancerContext#getServerFromLoadBalancer

此处看到了熟悉的getLoadBalancer方法以及chooseServer方法

 public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException {
        //获取负载均衡器
        ILoadBalancer lb = getLoadBalancer();
        if (host == null) { 
            //如果负载均衡器不为空则选择服务器
            if (lb != null){
                Server svc = lb.chooseServer(loadBalancerKey);
        ...
        return new Server(host, port);
    }
复制代码

具体的getLoadBalancer以及chooseServer这里就不解析了,因为之前Ribbon已经解析过了,有兴趣可以去看一下。

得到Server之后就会执行operation.call方法,也就是刚刚传进来的ServerOperation对象的call方法。这就是请求了服务。

 try {
    return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
} 
复制代码

实际上是调用了Client.java的execute方法,Client.java有一个默认实现Deafult

public static class Default implements Client

可以看到其实是HttpUrlConnection发送了请求

@Override
public Response execute(Request request, Options options) throws IOException {
  HttpURLConnection connection = convertAndSend(request, options);
  return convertResponse(connection).toBuilder().request(request).build();
}
复制代码

以上就是Feign的初始化以及调用流程啦!如果有什么问题请大佬们指正!

Ribbon传送门

[什么是Ribbon现在就带你研究!] juejin.cn/post/696426…

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享