前文已经介绍过了应用级发现模型、元数据信息。来到 Dubbo-go 应用级服务发现的最后一节,本节以围绕应用信息与元数据信息为核心,通过源码分析它们如何组合成 Invoker 。
在使用 Consumer 内建元数据中心时,元数据中心相对静态。而目前使用的是在 RegistryURL 上的信息,用于构建 Invoker。那究竟是怎么构建、在哪里构建呢?关于构建 Invoker ,还是在之前文章中提到的 Refer 方法(样例 1),前面已经分析了两部分,接下来把第三部分进行分析。
`func (proto *registryProtocol) Refer(url *common.URL) protocol.Invoker {`
`....`
`// 获取用于调用的实例及方法列表。`
`directory, err := extension.GetDefaultRegistryDirectory(registryUrl, reg)`
`if err != nil {`
`logger.Errorf("consumer service %v create registry directory error, error message is %s, and will return nil invoker!",`
`serviceUrl.String(), err.Error())`
`return nil`
`}`
`....`
`}`
复制代码
样例 1
在分析第三部分做什么之前,我们先要知道其返回值 RegistryDirectory 是什么。RegistryDirectory 是继承自 Directory 目录接口(样例 2),对于 Consumer 用于展示注册中心中存在且可被调用的远程服务。RegistryDirectory 代表多个Invoker,可以把它看成List,但与List不同的是,它的值可能是动态变化的,比如注册中心推送变更。而对于 Provider 则用于把 Invoker 信息结构化注册到注册中心。
`type Directory interface {`
`common.Node`
`List(invocation protocol.Invocation) []protocol.Invoker`
`}`
复制代码
样例 2
而 RegistryDirectory(样例 3) 包含以下两个功能,并用获得的应用/元数据信息,更新 Directory 中 Invoker 信息。
-
监听远程配置中心
-
监听远程注册中心
同时客户端订阅的一个服务,可能有多个服务端提供,所以需要一个目录服务。当有新 Provider 提供服务时,则通知当前目录服务,将从注册中心获得的 Provider URL 转成 invoker,放入缓存。
`type RegistryDirectory struct {`
`directory.BaseDirectory`
`cacheInvokers []protocol.Invoker`
`listenerLock sync.Mutex`
`serviceType string`
`registry registry.Registry`
`cacheInvokersMap *sync.Map // use sync.map`
`consumerURL *common.URL`
`cacheOriginUrl *common.URL`
`configurators []config_center.Configurator`
`consumerConfigurationListener *consumerConfigurationListener <<< consumer 配置监听`
`referenceConfigurationListener *referenceConfigurationListener <<< provider 配置监听`
`serviceKey string`
`forbidden atomic.Bool`
`}`
`// 注册中心更新事件`
`func (dir *RegistryDirectory) Notify(events ...*registry.ServiceEvent) {`
`go dir.refreshInvokers(events...)`
`}`
复制代码
样例 3
那第三部分是做什么的呢?其最主要的功能是:通过订阅配置中的注册中心,获取调用的实例及方法列表,并构造可调用的 Invoker。接下来就深入 extension.GetDefaultRegistryDirectory(registryUrl, reg)
内部(样例 4)看看究竟其做了什么操作。首先,需要先找到方法的入口。在样例 4 中,得知,是使用 defaultRegistry() 来进行后续的操作。而 defaultRegistry 是一个私有方法类型的变量,所以,直接找调用设置值的地方,得知 defaultRegistry=NewRegistryDirectory
。本质上来说,后续的处理过程都在:NewRegistryDirectory
。
`func GetDefaultRegistryDirectory(config *common.URL, registry registry.Registry) (cluster.Directory, error) {`
`if defaultRegistry == nil {`
`panic("registry directory is not existing, make sure you have import the package.")`
`}`
`return defaultRegistry(config, registry)`
`}`
复制代码
样例 4
在 NewRegistryDirectory
中,会做以下最主要的几件事:
-
通过配置信息(URL)获取需要注册的路由链:chain.NewRouterChain(dir.consumerURL);
-
构造 Consumer 远程配置中心的监听器:newConsumerConfigurationListener(dir)
-
订阅注册中心,并注册更新监听器:go dir.subscribe(url.SubURL)
在注册中心被订阅后,会获得注册中心的应用信息,并将其与本地的元数据信息。结合之后,则会异步写回 RegistryDirectory.cacheInvokers
中,用于后续 RPC。
对于上面 NewRegistryDirectory
所做的几件事,在这里就不详细一一分析。在接下来的文章中,会进行更深入的分析。
欢迎加入社区
在公众号【部长技术之路】后台回复关键字【dubbogo】加入社区。