VMInsert源代码分析

流程图

源代码结构

Main函数介绍

func main{
    envflag.Parse()   //环境变量初始化,包括cmd line和env环境变量
    netstorage.InitStorageNodes(*storageNodes)  //初始化storageNode
    relabel.Init()  //Init label
    storage.SetMaxLabelsPerTimeseries(*maxLabelsPerTimeseries)
    common.StartUnmarshalWorkers()  //开启Marshal的线程
    writeconcurrencylimiter.Init()  //初始化写入的线程数的限制,默认为CPU*4
    if len(*clusternativeListenAddr) > 0 {  //vminsert发送过来的数据进行转发
		clusternativeServer = clusternativeserver.MustStart(*clusternativeListenAddr, func(c net.Conn) error {
			return clusternative.InsertHandler(c)
		})
	}
	if len(*graphiteListenAddr) > 0 {  //graph类型数据转发,tcp和udp长连接
		graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, func(r io.Reader) error {
			var at auth.Token // TODO: properly initialize auth token
			return graphite.InsertHandler(&at, r)
		})
	}
	if len(*influxListenAddr) > 0 {  //influxdb数据转发,tcp和udp长连接
		influxServer = influxserver.MustStart(*influxListenAddr, func(r io.Reader) error {
			var at auth.Token // TODO: properly initialize auth token
			return influx.InsertHandlerForReader(&at, r)
		})
	}
	if len(*opentsdbListenAddr) > 0 {  //opentsdb数据转发,tcp和udp长连接
		opentsdbServer = opentsdbserver.MustStart(*opentsdbListenAddr, func(r io.Reader) error {
			var at auth.Token // TODO: properly initialize auth token
			return opentsdb.InsertHandler(&at, r)
		}, opentsdbhttp.InsertHandler)
	}
	if len(*opentsdbHTTPListenAddr) > 0 {
		opentsdbhttpServer = opentsdbhttpserver.MustStart(*opentsdbHTTPListenAddr, opentsdbhttp.InsertHandler)
	}

	go func() {
		httpserver.Serve(*httpListenAddr, requestHandler)
	}()

	sig := procutil.WaitForSigterm()
	logger.Infof("service received signal %s", sig)

	logger.Infof("gracefully shutting down http service at %q", *httpListenAddr)
	startTime = time.Now()
	if err := httpserver.Stop(*httpListenAddr); err != nil {
		logger.Fatalf("cannot stop http service: %s", err)
	}
	logger.Infof("successfully shut down http service in %.3f seconds", time.Since(startTime).Seconds())

	startTime = time.Now()
	netstorage.Stop()
	logger.Infof("successfully stopped netstorage in %.3f seconds", time.Since(startTime).Seconds())

	fs.MustStopDirRemover()

	logger.Infof("the vminsert has been stopped")
}
复制代码

通过Main的源代码可以看出Main主要分为三部分:

  1. 初始化部分:比如envflag的初始化。然后对netstorage进行初始化,最后对UnmarshalWorkerswriterLimiter进行初始化。具体每块是怎么工作的后面会一一进行介绍。
  2. 启动server:主要包括vminsert数据接受server,graphopentsdb等的数据接收端的server开启,以及httpserver的启动。
  3. 退出处理:vminsert对程序退出做了优雅退出的处理,主要是是清除一些状态信息, 然后把vminsert里面缓存的数据flush到vmstorage里面。

初始化

Env & Flag解析

func Parse() {
	flag.Parse()
	if !*enable {   //是否开启环境变量的初始化
		return
	}

	// Remember explicitly set command-line flags.
	flagsSet := make(map[string]bool)
	flag.Visit(func(f *flag.Flag) {
		flagsSet[f.Name] = true
	})

	// Obtain the remaining flag values from environment vars.
	flag.VisitAll(func(f *flag.Flag) {    //对每个需要设置的参数查找环境变量的值
		if flagsSet[f.Name] {     //如果cmd line设置了,以cmdline设置的为准
			// The flag is explicitly set via command-line.
			return
		}
		// Get flag value from environment var.
		fname := getEnvFlagName(f.Name)     //获取环境变量的值作为值,如果环境变量也没有则设置为默认值
		if v, ok := os.LookupEnv(fname); ok {
			if err := f.Value.Set(v); err != nil {
				// Do not use lib/logger here, since it is uninitialized yet.
				log.Fatalf("cannot set flag %s to %q, which is read from environment variable %q: %s", f.Name, v, fname, err)
			}
		}
	})
}
复制代码

envflag.Parse()主要是对flag的参数进行设置,如果开启了环境变量的参数,会从环境变量中获取cmd没有指定的参数进行设值。

NetStorage Init

func InitStorageNodes(addrs []string) {
	if len(addrs) > 255 {   //超过255个节点报错
		logger.Panicf("BUG: too much addresses: %d; max supported %d addresses", len(addrs), 255)
	}
    storageNodes = storageNodes[:0]  //初始化storageNode列表
	for _, addr := range addrs { //对每个addrs进行sn的创建
		sn := &storageNode{
			dialer: netutil.NewTCPDialer("vminsert", addr),

			dialErrors:           metrics.NewCounter(fmt.Sprintf(`vm_rpc_dial_errors_total{name="vminsert", addr=%q}`, addr)),
			handshakeErrors:      metrics.NewCounter(fmt.Sprintf(`vm_rpc_handshake_errors_total{name="vminsert", addr=%q}`, addr)),
			connectionErrors:     metrics.NewCounter(fmt.Sprintf(`vm_rpc_connection_errors_total{name="vminsert", addr=%q}`, addr)),
			rowsPushed:           metrics.NewCounter(fmt.Sprintf(`vm_rpc_rows_pushed_total{name="vminsert", addr=%q}`, addr)),
			rowsSent:             metrics.NewCounter(fmt.Sprintf(`vm_rpc_rows_sent_total{name="vminsert", addr=%q}`, addr)),
			rowsReroutedFromHere: metrics.NewCounter(fmt.Sprintf(`vm_rpc_rows_rerouted_from_here_total{name="vminsert", addr=%q}`, addr)),
			rowsReroutedToHere:   metrics.NewCounter(fmt.Sprintf(`vm_rpc_rows_rerouted_to_here_total{name="vminsert", addr=%q}`, addr)),
		}
		.... 
		storageNodes = append(storageNodes, sn)  //添加创建的SN至SN列表中
	}
    //设置insert的缓存值
    ....
    
    //对每个SN进行执行
    for idx, sn := range storageNodes {
		storageNodesWG.Add(1)
		go func(sn *storageNode, idx int) {
			sn.run(storageNodesStopCh, idx)
			storageNodesWG.Done()
		}(sn, idx)
	}

	rerouteWorkerWG.Add(1)
	go func() {
		rerouteWorker(rerouteWorkerStopCh)
		rerouteWorkerWG.Done()
	}()
}

func (sn *storageNode) run(stopCh <-chan struct{}, snIdx int) {
    //对每个副本进行判断,最大的副本数不超过SN列表的长度
	replicas := *replicationFactor
	if replicas <= 0 {
		replicas = 1
	}
	if replicas > len(storageNodes) {
		replicas = len(storageNodes)
	}
	ticker := time.NewTicker(200 * time.Millisecond)  //200ms进行一次刷新
    defer ticker.Stop()
	var br bufRows
	brLastResetTime := fasttime.UnixTimestamp()
	var waitCh <-chan struct{}
	mustStop := false
	for !mustStop {  //一直循环至程序停止
		sn.brLock.Lock()
		bufLen := len(sn.br.buf)  //加锁判断buf的长度
		sn.brLock.Unlock()
		waitCh = nil
		if bufLen > 0 {
			// Do not sleep if sn.br.buf isn't empty.
			waitCh = closedCh  //如果buf不为空则直接进行flush
		}
		select {
		case <-stopCh:
			mustStop = true   //停止程序
			// Make sure the sn.buf is flushed last time before returning
			// in order to send the remaining bits of data.
		case <-ticker.C:  //200ms进行flush
		case <-waitCh:  //直接进行数据flush
		}
		sn.brLock.Lock()
		sn.br, br = br, sn.br  //加锁获取数据并复制一个新的br进去
		sn.brLock.Unlock()
		currentTime := fasttime.UnixTimestamp()
		if len(br.buf) < cap(br.buf)/4 && currentTime-brLastResetTime > 10 {  //对于数据量较小的情况该次不进行传输数据,保存至下一次数据传递
			// Free up capacity space occupied by br.buf in order to reduce memory usage after spikes.
			br.buf = append(br.buf[:0:0], br.buf...)
			brLastResetTime = currentTime
		}
		sn.checkHealth()
		if len(br.buf) == 0 {
			// Nothing to send.
			continue
		}
		// Send br to replicas storageNodes starting from snIdx.
		for !sendBufToReplicasNonblocking(&br, snIdx, replicas) {  //发送数据至SN里面
			t := timerpool.Get(200 * time.Millisecond) //重新设置timer
			select {
			case <-stopCh:
				timerpool.Put(t)
				return
			case <-t.C:
				timerpool.Put(t)
				sn.checkHealth()
			}
		}
		br.reset()  //清空buf
	}
   
   
  func sendBufToReplicasNonblocking(br *bufRows, snIdx, replicas int) bool {
	usedStorageNodes := make(map[*storageNode]bool, replicas)  //使用map来标记该SN是否已经传输过数据,保证一个SN只会存储一个副本
	for i := 0; i < replicas; i++ { //对每个副本找到一个SN来进行数据存储
		idx := snIdx + i //初始使用该SN的下i个SN作为数据存储节点
		attempts := 0
		for {
			attempts++
			if attempts > len(storageNodes) {  //遍历完所有的SN,还没有找到可以适应的SN报错
				if i == 0 { //如果一个副本都没有存储成功报错,如果有一个副本成功了则输出Warn
					// The data wasn't replicated at all.
					logger.Warnf("cannot push %d bytes with %d rows to storage nodes, since all the nodes are temporarily unavailable; "+
						"re-trying to send the data soon", len(br.buf), br.rows)
					return false
				}
				// The data is partially replicated, so just emit a warning and return true.
				// We could retry sending the data again, but this may result in uncontrolled duplicate data.
				// So it is better returning true.
				rowsIncompletelyReplicatedTotal.Add(br.rows)
				logger.Warnf("cannot make a copy #%d out of %d copies according to -replicationFactor=%d for %d bytes with %d rows, "+
					"since a part of storage nodes is temporarily unavailable", i+1, replicas, *replicationFactor, len(br.buf), br.rows)
				return true
			}
			if idx >= len(storageNodes) {
				idx %= len(storageNodes)  //如果idx大于SN,则设置
			}
			sn := storageNodes[idx]
			idx++
			if usedStorageNodes[sn] { //该SN已经存储过,查找下一个
				// The br has been already replicated to sn. Skip it.
				continue
			}
			if !sn.sendBufRowsNonblocking(br) { //无法发送数据,下一个
				// Cannot send data to sn. Go to the next sn.
				continue
			}
			// Successfully sent data to sn.
			usedStorageNodes[sn] = true  //发送成功标记SN已经发送了
			break
		}
	}
	return true
}
    
}
复制代码

netstorage的初始化主要是通过解析--storageNode的参数,获取storageNode的地址。对每个address启动一个协程,运行run函数。run函数中主要是通过遍历channel获取数据,然后通过sendBufToReplicasNonblocking函数进行数据发送。sendBufToReplicasNonblocking中会对副本等策略进行判断,目前VM使用的策略是设置一个HashMap,然后每设置一个副本,就把该节点的HashValue设置为True,这样可以保证一个机器只存储一个副本的数据。

UnMarshalWorker Init

func StartUnmarshalWorkers() {
	if unmarshalWorkCh != nil {
		logger.Panicf("BUG: it looks like startUnmarshalWorkers() has been alread called without stopUnmarshalWorkers()")
	}
	gomaxprocs := cgroup.AvailableCPUs() //获取可以使用的CPU数
	unmarshalWorkCh = make(chan UnmarshalWork, gomaxprocs)
	unmarshalWorkersWG.Add(gomaxprocs)
	for i := 0; i < gomaxprocs; i++ { //每个CPU开启一个worker去进行Sink
		go func() {
			defer unmarshalWorkersWG.Done()
			for uw := range unmarshalWorkCh {
				uw.Unmarshal()
			}
		}()
	}
}

func ScheduleUnmarshalWork(uw UnmarshalWork) {  //将uw放入chan中等待调度
	unmarshalWorkCh <- uw
}
复制代码

StartUnmarshalWorkers的功能主要是根据cgroup的参数获取可以使用的CPU数量,然后每个CPU开启一个协程去进行Unmarshal的操作。Vminsert的主要工作就是对数据进行转化,把收集到的数据转换为VmStorage可以使用的数据。
ScheduleUnmarshalWork是获取数据之后会调用的函数,主要功能就是把uw放置在调度队列里面等待uw被调度并执行。

Server部分

该部分逻辑基本类似,主要的区别在于对于每种数据的解析方式和流程不同,大致使用的函数名都是类似的,该部分以VmInsert的作为一个示例来进行解释。

func InsertHandler(c net.Conn) error {
	bc, err := handshake.VMInsertServer(c, 0)
	if err != nil {
		return fmt.Errorf("cannot perform vminsert handshake with client %q: %w", c.RemoteAddr(), err)
	}
	return writeconcurrencylimiter.Do(func() error {
		return parser.ParseStream(bc, func(rows []storage.MetricRow) error {
			return insertRows(rows)   //插入rows数据
		})
	})
}

func getUnmarshalWork() *unmarshalWork {
	v := unmarshalWorkPool.Get()
	if v == nil {
		return &unmarshalWork{}
	}
	return v.(*unmarshalWork)
}

func putUnmarshalWork(uw *unmarshalWork) {
	uw.reset()
	unmarshalWorkPool.Put(uw)
}

var unmarshalWorkPool sync.Pool
func ParseStream(bc *handshake.BufferedConn, callback func(rows []storage.MetricRow) error) error {
	var wg sync.WaitGroup
	var (
		callbackErrLock sync.Mutex
		callbackErr     error
	)
	for {
		uw := getUnmarshalWork()  //从 sync.Pool中获取uw
		uw.callback = func(rows []storage.MetricRow) {  //设置uw的回调函数
			if err := callback(rows); err != nil {
				processErrors.Inc()
				callbackErrLock.Lock()
				if callbackErr == nil {
					callbackErr = fmt.Errorf("error when processing native block: %w", err)
				}
				callbackErrLock.Unlock()
			}
		}
		uw.wg = &wg
		var err error
		uw.reqBuf, err = readBlock(uw.reqBuf[:0], bc)   //从网络连接中获取一个block的数据
		if err != nil {
			wg.Wait()
			if err == io.EOF {
				// Remote end gracefully closed the connection.
				putUnmarshalWork(uw)    //如果读取失败,将uw放在sync.Pool中供下次使用
				return nil
			}
			return fmt.Errorf("cannot read packet size: %w", err)
		}
		blocksRead.Inc()
		wg.Add(1)
		common.ScheduleUnmarshalWork(uw)   //将uw送至uw的调度中,进行调度
	}
}
// Unmarshal implements common.UnmarshalWork
func (uw *unmarshalWork) Unmarshal() {
	defer uw.wg.Done()
	if err := uw.unmarshal(); err != nil {
		parseErrors.Inc()
		logger.Errorf("error when unmarshaling clusternative block: %s", err)
		putUnmarshalWork(uw)
		return
	}
	mrs := uw.mrs
	for len(mrs) > maxRowsPerCallback {
		// Limit the number of rows passed to callback in order to reduce memory usage
		// when processing big packets of rows.
		uw.callback(mrs[:maxRowsPerCallback])
		mrs = mrs[maxRowsPerCallback:]
	}
	uw.callback(mrs)
	putUnmarshalWork(uw)
}

func insertRows(rows []storage.MetricRow) error {
	ctx := netstorage.GetInsertCtx()   //获取InsertCtx,有个pool存储Ctx.用完会通过Put插入到pool里面
	defer netstorage.PutInsertCtx(ctx)

	ctx.Reset() // This line is required for initializing ctx internals.  初始化Ctx,把上一次的Ctx的信息清除掉
	hasRelabeling := relabel.HasRelabeling()
	var at auth.Token
	var rowsPerTenant *metrics.Counter
	var mn storage.MetricName
	for i := range rows {
		mr := &rows[i]
		if err := mn.UnmarshalRaw(mr.MetricNameRaw); err != nil {  //通过编码后的数据得到MetricName的信息
			return fmt.Errorf("cannot unmarshal MetricNameRaw: %w", err)
		}
		if rowsPerTenant == nil || mn.AccountID != at.AccountID || mn.ProjectID != at.ProjectID {  //如果at的信息变换了,重新填充
			at.AccountID = mn.AccountID
			at.ProjectID = mn.ProjectID
			rowsPerTenant = rowsTenantInserted.Get(&at)
		}
		ctx.Labels = ctx.Labels[:0]
		ctx.AddLabelBytes(nil, mn.MetricGroup)
		for j := range mn.Tags {
			tag := &mn.Tags[j]
			ctx.AddLabelBytes(tag.Key, tag.Value)
		}
		if hasRelabeling {
			ctx.ApplyRelabeling()  //没有太明白这块的操作是为什么
		}
		if len(ctx.Labels) == 0 {
			// Skip metric without labels.
			continue
		}
		ctx.SortLabelsIfNeeded()
		if err := ctx.WriteDataPoint(&at, ctx.Labels, mr.Timestamp, mr.Value); err != nil {   //写入数据
			return err
		}
		rowsPerTenant.Inc()
	}
	rowsInserted.Add(len(rows))
	rowsPerInsert.Update(float64(len(rows)))
	return ctx.FlushBufs()  //把剩余的数据推送到SN中
}

func (ctx *InsertCtx) WriteDataPoint(at *auth.Token, labels []prompb.Label, timestamp int64, value float64) error {
	ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], at.AccountID, at.ProjectID, labels)  //把Label,at相关的信息编码为[]byte,并打包在一起
	storageNodeIdx := ctx.GetStorageNodeIdx(at, labels)    //根据label的信息进行hash,找到对应的SN进行存储
	return ctx.WriteDataPointExt(at, storageNodeIdx, ctx.MetricNameBuf, timestamp, value)
}

func (ctx *InsertCtx) GetStorageNodeIdx(at *auth.Token, labels []prompb.Label) int {
	if len(storageNodes) == 1 {
		// Fast path - only a single storage node.
		return 0
	}

	buf := ctx.labelsBuf[:0]
	buf = encoding.MarshalUint32(buf, at.AccountID)
	buf = encoding.MarshalUint32(buf, at.ProjectID)
	for i := range labels {
		label := &labels[i]
		buf = marshalBytesFast(buf, label.Name)
		buf = marshalBytesFast(buf, label.Value)
	}
	h := xxhash.Sum64(buf)  //生成Hash值
	ctx.labelsBuf = buf

	idx := int(jump.Hash(h, int32(len(storageNodes)))) //生成对应的SN的index
	return idx
}

func (ctx *InsertCtx) WriteDataPointExt(at *auth.Token, storageNodeIdx int, metricNameRaw []byte, timestamp int64, value float64) error {
	br := &ctx.bufRowss[storageNodeIdx]
	sn := storageNodes[storageNodeIdx]
	bufNew := storage.MarshalMetricRow(br.buf, metricNameRaw, timestamp, value)   //把ts和value的信息打包进buf里面
	if len(bufNew) >= maxBufSizePerStorageNode {  //如果br中的现存数据过大,直接发送到SN中
		// Send buf to storageNode, since it is too big.
		if err := br.pushTo(sn); err != nil {
			return err
		}
		br.buf = storage.MarshalMetricRow(bufNew[:0], metricNameRaw, timestamp, value)  //清楚之前的数据,并把最近的一条数据设置到br中
	} else {
		br.buf = bufNew  //数据没有达到上限,直接把数据赋值进去
	}
	br.rows++
	return nil
}
复制代码
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享