从go-micro的broker开始学习go-micro

go-micro简介

我们在之前的文章中提到的Micro其实是Micro这个微服务框架的工具集,其核心还是go-micro.
Go Micro是一个分布式系统开发框架。Micro 哲学是具有可插拔架构的合理默认值。 框架提供默认设置以帮助我们快速入门这个框架,但一切都可以轻松更换,在真实的项目中,一般是可以根据自己的需求来更换组件,比如注册中心可以使用etcd,consul.而Boker发布订阅可以使用Kafka,nsq等工具。

go-micro的主要功能

Go Micro 抽象了分布式系统的细节。 以下是主要功能。

  • 认证:Auth 内置为一等公民。 身份验证和授权通过为每项服务提供身份和证书来实现安全的零信任网络。 这还包括基于规则的访问控制。

  • 动态配置:从任何地方加载和热重载动态配置。 配置接口提供了一种从任何来源(例如 env vars、文件、etcd)加载应用程序级别配置的方法。 您可以合并源,甚至定义回退。

  • 数据存储:一个简单的数据存储接口,用于读取、写入和删除记录。它默认支持内存、文件和CockroachDB。状态和持久性成为原型之外的核心需求,Micro希望将其构建到框架中。

  • 服务发现:自动服务注册和名称解析。 服务发现是微服务开发的核心。 当服务 A 需要与服务 B 通话时,它需要该服务的位置。 默认发现机制是多播 DNS (mdns),一个 zeroconf 系统。

  • 负载均衡:基于服务发现的客户端负载平衡。 一旦我们获得了任意数量的服务实例的地址,我们现在需要一种方法来决定路由到哪个节点。 我们使用随机散列负载平衡来提供跨服务的均匀分布,并在出现问题时重试不同的节点。

  • 消息编码:基于内容类型的动态消息编码。 客户端和服务器将使用编解码器和内容类型为您无缝编码和解码 Go 类型。 任何种类的消息都可以被编码并从不同的客户端发送。 客户端和服务器默认处理这个。 默认情况下,这包括 protobuf 和 json。

  • RPC 客户端/服务器:基于 RPC 的请求/响应,支持双向流。 我们为同步通信提供了一个抽象。 对服务提出的请求将被自动解析、负载平衡、拨号和流式传输。

  • 异步消息:PubSub 内置为异步通信和事件驱动架构的一等公民。 事件通知是微服务开发的核心模式。 默认消息系统是 HTTP 事件消息代理。

  • 同步:分布式系统通常以最终一致的方式构建。 对分布式锁定和领导的支持作为同步接口内置。 使用最终一致的数据库或调度时,请使用 Sync 接口。

  • 可插拔接口:Go Micro 为每个分布式系统抽象使用 Go 接口。 因此,这些接口是可插拔的,并允许 Go Micro 与运行时无关。 您可以插入任何底层技术

broker组件

下面我们就从broker组件开始,一起学习一下如何使用go-micro,我们知道,broker是go-micro用来实现异步消息的一种模式,其本质是PubSub。你可以自定义使用不同的工具,比如kafka等。而go-micro也实现了一个默认的消息队列系统,方便我们学习go-micro。我们一起看一个examples中的实例,一起学习一下,如何使用默认的消息系统来实现异步消息。

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/asim/go-micro/v3/broker"
	"github.com/asim/go-micro/v3/cmd"
)

var (
	topic = "go.micro.topic.foo"
)

func pub() {
	tick := time.NewTicker(time.Second)
	i := 0
	for _ = range tick.C {
		msg := &broker.Message{
			Header: map[string]string{
				"id": fmt.Sprintf("%d", i),
			},
			Body: []byte(fmt.Sprintf("%d: %s", i, time.Now().String())),
		}
		if err := broker.Publish(topic, msg); err != nil {
			log.Printf("[pub] failed: %v", err)
		} else {
			fmt.Println("[pub] pubbed message:", string(msg.Body))
		}
		i++
	}
}

func sub() {
	_, err := broker.Subscribe(topic, func(p broker.Event) error {
		fmt.Println("[sub] received message:", string(p.Message().Body), "header", p.Message().Header)
		return nil
	})
	if err != nil {
		fmt.Println(err)
	}
}

func main() {
	cmd.Init()

	if err := broker.Init(); err != nil {
		log.Fatalf("Broker Init error: %v", err)
	}
	if err := broker.Connect(); err != nil {
		log.Fatalf("Broker Connect error: %v", err)
	}

	go pub()
	go sub()

	<-time.After(time.Second * 10)
}

复制代码

例子其实很简单,实现一个发布者函数pub,该函数在每个tick中,向指定topic发布一条。

同时,实现了一个订阅者sub,订阅者从指定topic中获取消息,并执行处理函数。处理函数的参数是一个Event接口,通过它,我们可以获取消息.该消息包含Header和Body信息。

这些就是一个建议的消息系统,那如何让这个消息系统跑起来的呢,其主要逻辑就是main函数中的逻辑,我们一个来看看上面的main函数。首先调用cmd.init方法,该方法的作用是

func Init(opts ...Option) error {
	return DefaultCmd.Init(opts...)
}
复制代码

DefaultCmd是一个所有组件都是默认值的Cmd实例。

然后初始化一个broker,在这里,默认的broker就是一个http broker.

func newHttpBroker(opts ...Option) Broker {
	options := Options{
		Codec:    json.Marshaler{},
		Context:  context.TODO(),
		Registry: registry.DefaultRegistry,
	}

	for _, o := range opts {
		o(&options)
	}

	// set address
	addr := DefaultAddress

	if len(options.Addrs) > 0 && len(options.Addrs[0]) > 0 {
		addr = options.Addrs[0]
	}

	h := &httpBroker{
		id:          uuid.New().String(),
		address:     addr,
		opts:        options,
		r:           options.Registry,
		c:           &http.Client{Transport: newTransport(options.TLSConfig)},
		subscribers: make(map[string][]*httpSubscriber),
		exit:        make(chan chan error),
		mux:         http.NewServeMux(),
		inbox:       make(map[string][][]byte),
	}

	// specify the message handler
	h.mux.Handle(DefaultPath, h)

	// get optional handlers
	if h.opts.Context != nil {
		handlers, ok := h.opts.Context.Value("http_handlers").(map[string]http.Handler)
		if ok {
			for pattern, handler := range handlers {
				h.mux.Handle(pattern, handler)
			}
		}
	}

	return h
}
复制代码

如果addr option没有指定,就是默认值"127.0.0.1:0"

然后调用http broker的Connect方法,来连接服务器


func (h *httpBroker) Connect() error {
	h.RLock()
	if h.running {
		h.RUnlock()
		return nil
	}
	h.RUnlock()

	h.Lock()
	defer h.Unlock()

	var l net.Listener
	var err error

	if h.opts.Secure || h.opts.TLSConfig != nil {
		config := h.opts.TLSConfig

		fn := func(addr string) (net.Listener, error) {
			if config == nil {
				hosts := []string{addr}

				// check if its a valid host:port
				if host, _, err := net.SplitHostPort(addr); err == nil {
					if len(host) == 0 {
						hosts = maddr.IPs()
					} else {
						hosts = []string{host}
					}
				}

				// generate a certificate
				cert, err := mls.Certificate(hosts...)
				if err != nil {
					return nil, err
				}
				config = &tls.Config{Certificates: []tls.Certificate{cert}}
			}
			return tls.Listen("tcp", addr, config)
		}

		l, err = mnet.Listen(h.address, fn)
	} else {
		fn := func(addr string) (net.Listener, error) {
			return net.Listen("tcp", addr)
		}

		l, err = mnet.Listen(h.address, fn)
	}

	if err != nil {
		return err
	}

	addr := h.address
	h.address = l.Addr().String()

	go http.Serve(l, h.mux)
	go func() {
		h.run(l)
		h.Lock()
		h.opts.Addrs = []string{addr}
		h.address = addr
		h.Unlock()
	}()

	// get registry
	reg := h.opts.Registry
	if reg == nil {
		reg = registry.DefaultRegistry
	}
	// set cache
	h.r = cache.New(reg)

	// set running
	h.running = true
	return nil
}
复制代码

在连接了服务器之后,我们就可以建立连接进行监听处理消息了。有没有发现,其实就是一个http server,监听服务请求,然后进行事件处理。

而在发布和订阅相应的代码逻辑了,我们可以看到,在发布的逻辑里,我们会首先将msg,saveMessage到指定的topic中,然后异步处理消息,那么如何进行saveMessage,以及如何进行异步消息处理的呢,首先,


func (h *httpBroker) saveMessage(topic string, msg []byte) {
	h.mtx.Lock()
	defer h.mtx.Unlock()

	// get messages
	c := h.inbox[topic]

	// save message
	c = append(c, msg)

	// max length 64
	if len(c) > 64 {
		c = c[:64]
	}

	// save inbox
	h.inbox[topic] = c
}
复制代码

保存消息的逻辑足够简单,仅仅是将数据保存到http broker的某一个Map中。至于消息处理

// do the rest async
	go func() {
		// get a third of the backlog
		messages := h.getMessage(topic, 8)
		delay := (len(messages) > 1)

		// publish all the messages
		for _, msg := range messages {
			// serialize here
			srv(s, msg)

			// sending a backlog of messages
			if delay {
				time.Sleep(time.Millisecond * 100)
			}
		}
	}()
复制代码

在一个goroutine里,从topic中取出message,然后从注册中心中根据服务名获取到服务的列表,然后再列表中找到符合条件的节点,找到节点后就可以将消息通过bttp broker结构体中的client的Post方法,将消息发送给服务器,并根据返回结果进行错误处理。


func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error {
	// create the message first
	m := &Message{
		Header: make(map[string]string),
		Body:   msg.Body,
	}

	for k, v := range msg.Header {
		m.Header[k] = v
	}

	m.Header["Micro-Topic"] = topic

	// encode the message
	b, err := h.opts.Codec.Marshal(m)
	if err != nil {
		return err
	}

	// save the message
	h.saveMessage(topic, b)

	// now attempt to get the service
	h.RLock()
	s, err := h.r.GetService(serviceName)
	if err != nil {
		h.RUnlock()
		return err
	}
	h.RUnlock()

	pub := func(node *registry.Node, t string, b []byte) error {
		scheme := "http"

		// check if secure is added in metadata
		if node.Metadata["secure"] == "true" {
			scheme = "https"
		}

		vals := url.Values{}
		vals.Add("id", node.Id)

		uri := fmt.Sprintf("%s://%s%s?%s", scheme, node.Address, DefaultPath, vals.Encode())
		r, err := h.c.Post(uri, "application/json", bytes.NewReader(b))
		if err != nil {
			return err
		}

		// discard response body
		io.Copy(ioutil.Discard, r.Body)
		r.Body.Close()
		return nil
	}

	srv := func(s []*registry.Service, b []byte) {
		for _, service := range s {
			var nodes []*registry.Node

			for _, node := range service.Nodes {
				// only use nodes tagged with broker http
				if node.Metadata["broker"] != "http" {
					continue
				}

				// look for nodes for the topic
				if node.Metadata["topic"] != topic {
					continue
				}

				nodes = append(nodes, node)
			}

			// only process if we have nodes
			if len(nodes) == 0 {
				continue
			}

			switch service.Version {
			// broadcast version means broadcast to all nodes
			case broadcastVersion:
				var success bool

				// publish to all nodes
				for _, node := range nodes {
					// publish async
					if err := pub(node, topic, b); err == nil {
						success = true
					}
				}

				// save if it failed to publish at least once
				if !success {
					h.saveMessage(topic, b)
				}
			default:
				// select node to publish to
				node := nodes[rand.Int()%len(nodes)]

				// publish async to one node
				if err := pub(node, topic, b); err != nil {
					// if failed save it
					h.saveMessage(topic, b)
				}
			}
		}
	}

	// do the rest async
	go func() {
		// get a third of the backlog
		messages := h.getMessage(topic, 8)
		delay := (len(messages) > 1)

		// publish all the messages
		for _, msg := range messages {
			// serialize here
			srv(s, msg)

			// sending a backlog of messages
			if delay {
				time.Sleep(time.Millisecond * 100)
			}
		}
	}()

	return nil
}

复制代码

而订阅的逻辑就是生产一个订阅者

// generate subscriber
	subscriber := &httpSubscriber{
		opts:  options,
		hb:    h,
		id:    node.Id,
		topic: topic,
		fn:    handler,
		svc:   service,
	}
复制代码

然后调用http broker的subscribe方法,真正的进行订阅操作。

func (h *httpBroker) subscribe(s *httpSubscriber) error {
	h.Lock()
	defer h.Unlock()

	if err := h.r.Register(s.svc, registry.RegisterTTL(registerTTL)); err != nil {
		return err
	}

	h.subscribers[s.topic] = append(h.subscribers[s.topic], s)
	return nil
}
复制代码

这个逻辑就很简单了,将服务注册到注册中心,然后把这个订阅着追加到http broker的订阅者的map里,方便消息的通信。

看到这里,你可能有一个疑惑,我并没有看到,我的消息处理函数是如何在订阅着接受到消息进行数据处理的呢,我们都知道,当我们使用net/http起一个http serve的时候,会传入一个handler.

// Serve always returns a non-nil error.
func Serve(l net.Listener, handler Handler) error {
	srv := &Server{Handler: handler}
	return srv.Serve(l)
}

复制代码

而我们在http broker Connect的时候,启动的server,传入的的正式h.mux
而在指定handler的时候,我们是


	// specify the message handler
	h.mux.Handle(DefaultPath, h)
复制代码

DefaultPath是/,h都是我们的http broker.之所以可以将它作为handler传入到Handle方法中,是因为,http broker实现了ServeHTTP方法,在这里就是路由的处理函数。


func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
	if req.Method != "POST" {
		err := merr.BadRequest("go.micro.broker", "Method not allowed")
		http.Error(w, err.Error(), http.StatusMethodNotAllowed)
		return
	}
	defer req.Body.Close()

	req.ParseForm()

	b, err := ioutil.ReadAll(req.Body)
	if err != nil {
		errr := merr.InternalServerError("go.micro.broker", "Error reading request body: %v", err)
		w.WriteHeader(500)
		w.Write([]byte(errr.Error()))
		return
	}

	var m *Message
	if err = h.opts.Codec.Unmarshal(b, &m); err != nil {
		errr := merr.InternalServerError("go.micro.broker", "Error parsing request body: %v", err)
		w.WriteHeader(500)
		w.Write([]byte(errr.Error()))
		return
	}

	topic := m.Header["Micro-Topic"]
	//delete(m.Header, ":topic")

	if len(topic) == 0 {
		errr := merr.InternalServerError("go.micro.broker", "Topic not found")
		w.WriteHeader(500)
		w.Write([]byte(errr.Error()))
		return
	}

	p := &httpEvent{m: m, t: topic}
	id := req.Form.Get("id")

	//nolint:prealloc
	var subs []Handler

	h.RLock()
	for _, subscriber := range h.subscribers[topic] {
		if id != subscriber.id {
			continue
		}
		subs = append(subs, subscriber.fn)
	}
	h.RUnlock()

	// execute the handler
	for _, fn := range subs {
		p.err = fn(p)
	}
}
复制代码

在这里就可以解码消息,获取消息体,并进行消息处理函数的调用。

//nolint:prealloc
	var subs []Handler

	h.RLock()
	for _, subscriber := range h.subscribers[topic] {
		if id != subscriber.id {
			continue
		}
		subs = append(subs, subscriber.fn)
	}
	h.RUnlock()

	// execute the handler
	for _, fn := range subs {
		p.err = fn(p)
	}
复制代码

这就是整个http broker的处理流程,相信你现在就可以明白了go-micro的异步消息系统的默认实现了,如果你想实现自己的broker,只要在实现中实现了Broker接口,就可以在项目中使用自定义的broker了。无缝切换。

// Broker is an interface used for asynchronous messaging.
type Broker interface {
	Init(...Option) error
	Options() Options
	Address() string
	Connect() error
	Disconnect() error
	Publish(topic string, m *Message, opts ...PublishOption) error
	Subscribe(topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)
	String() string
}

复制代码

至此整个broker部分就结束了,下一篇文章,我们就来看看如何实现一个自定义的broker.

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享