流程图
源代码结构
Main函数介绍
func main{
envflag.Parse() //环境变量初始化,包括cmd line和env环境变量
netstorage.InitStorageNodes(*storageNodes) //初始化storageNode
relabel.Init() //Init label
storage.SetMaxLabelsPerTimeseries(*maxLabelsPerTimeseries)
common.StartUnmarshalWorkers() //开启Marshal的线程
writeconcurrencylimiter.Init() //初始化写入的线程数的限制,默认为CPU*4
if len(*clusternativeListenAddr) > 0 { //vminsert发送过来的数据进行转发
clusternativeServer = clusternativeserver.MustStart(*clusternativeListenAddr, func(c net.Conn) error {
return clusternative.InsertHandler(c)
})
}
if len(*graphiteListenAddr) > 0 { //graph类型数据转发,tcp和udp长连接
graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, func(r io.Reader) error {
var at auth.Token // TODO: properly initialize auth token
return graphite.InsertHandler(&at, r)
})
}
if len(*influxListenAddr) > 0 { //influxdb数据转发,tcp和udp长连接
influxServer = influxserver.MustStart(*influxListenAddr, func(r io.Reader) error {
var at auth.Token // TODO: properly initialize auth token
return influx.InsertHandlerForReader(&at, r)
})
}
if len(*opentsdbListenAddr) > 0 { //opentsdb数据转发,tcp和udp长连接
opentsdbServer = opentsdbserver.MustStart(*opentsdbListenAddr, func(r io.Reader) error {
var at auth.Token // TODO: properly initialize auth token
return opentsdb.InsertHandler(&at, r)
}, opentsdbhttp.InsertHandler)
}
if len(*opentsdbHTTPListenAddr) > 0 {
opentsdbhttpServer = opentsdbhttpserver.MustStart(*opentsdbHTTPListenAddr, opentsdbhttp.InsertHandler)
}
go func() {
httpserver.Serve(*httpListenAddr, requestHandler)
}()
sig := procutil.WaitForSigterm()
logger.Infof("service received signal %s", sig)
logger.Infof("gracefully shutting down http service at %q", *httpListenAddr)
startTime = time.Now()
if err := httpserver.Stop(*httpListenAddr); err != nil {
logger.Fatalf("cannot stop http service: %s", err)
}
logger.Infof("successfully shut down http service in %.3f seconds", time.Since(startTime).Seconds())
startTime = time.Now()
netstorage.Stop()
logger.Infof("successfully stopped netstorage in %.3f seconds", time.Since(startTime).Seconds())
fs.MustStopDirRemover()
logger.Infof("the vminsert has been stopped")
}
复制代码
通过Main的源代码可以看出Main主要分为三部分:
- 初始化部分:比如
env和flag的初始化。然后对netstorage进行初始化,最后对UnmarshalWorkers和writerLimiter进行初始化。具体每块是怎么工作的后面会一一进行介绍。 - 启动server:主要包括
vminsert数据接受server,graph和opentsdb等的数据接收端的server开启,以及httpserver的启动。 - 退出处理:
vminsert对程序退出做了优雅退出的处理,主要是是清除一些状态信息, 然后把vminsert里面缓存的数据flush到vmstorage里面。
初始化
Env & Flag解析
func Parse() {
flag.Parse()
if !*enable { //是否开启环境变量的初始化
return
}
// Remember explicitly set command-line flags.
flagsSet := make(map[string]bool)
flag.Visit(func(f *flag.Flag) {
flagsSet[f.Name] = true
})
// Obtain the remaining flag values from environment vars.
flag.VisitAll(func(f *flag.Flag) { //对每个需要设置的参数查找环境变量的值
if flagsSet[f.Name] { //如果cmd line设置了,以cmdline设置的为准
// The flag is explicitly set via command-line.
return
}
// Get flag value from environment var.
fname := getEnvFlagName(f.Name) //获取环境变量的值作为值,如果环境变量也没有则设置为默认值
if v, ok := os.LookupEnv(fname); ok {
if err := f.Value.Set(v); err != nil {
// Do not use lib/logger here, since it is uninitialized yet.
log.Fatalf("cannot set flag %s to %q, which is read from environment variable %q: %s", f.Name, v, fname, err)
}
}
})
}
复制代码
envflag.Parse()主要是对flag的参数进行设置,如果开启了环境变量的参数,会从环境变量中获取cmd没有指定的参数进行设值。
NetStorage Init
func InitStorageNodes(addrs []string) {
if len(addrs) > 255 { //超过255个节点报错
logger.Panicf("BUG: too much addresses: %d; max supported %d addresses", len(addrs), 255)
}
storageNodes = storageNodes[:0] //初始化storageNode列表
for _, addr := range addrs { //对每个addrs进行sn的创建
sn := &storageNode{
dialer: netutil.NewTCPDialer("vminsert", addr),
dialErrors: metrics.NewCounter(fmt.Sprintf(`vm_rpc_dial_errors_total{name="vminsert", addr=%q}`, addr)),
handshakeErrors: metrics.NewCounter(fmt.Sprintf(`vm_rpc_handshake_errors_total{name="vminsert", addr=%q}`, addr)),
connectionErrors: metrics.NewCounter(fmt.Sprintf(`vm_rpc_connection_errors_total{name="vminsert", addr=%q}`, addr)),
rowsPushed: metrics.NewCounter(fmt.Sprintf(`vm_rpc_rows_pushed_total{name="vminsert", addr=%q}`, addr)),
rowsSent: metrics.NewCounter(fmt.Sprintf(`vm_rpc_rows_sent_total{name="vminsert", addr=%q}`, addr)),
rowsReroutedFromHere: metrics.NewCounter(fmt.Sprintf(`vm_rpc_rows_rerouted_from_here_total{name="vminsert", addr=%q}`, addr)),
rowsReroutedToHere: metrics.NewCounter(fmt.Sprintf(`vm_rpc_rows_rerouted_to_here_total{name="vminsert", addr=%q}`, addr)),
}
....
storageNodes = append(storageNodes, sn) //添加创建的SN至SN列表中
}
//设置insert的缓存值
....
//对每个SN进行执行
for idx, sn := range storageNodes {
storageNodesWG.Add(1)
go func(sn *storageNode, idx int) {
sn.run(storageNodesStopCh, idx)
storageNodesWG.Done()
}(sn, idx)
}
rerouteWorkerWG.Add(1)
go func() {
rerouteWorker(rerouteWorkerStopCh)
rerouteWorkerWG.Done()
}()
}
func (sn *storageNode) run(stopCh <-chan struct{}, snIdx int) {
//对每个副本进行判断,最大的副本数不超过SN列表的长度
replicas := *replicationFactor
if replicas <= 0 {
replicas = 1
}
if replicas > len(storageNodes) {
replicas = len(storageNodes)
}
ticker := time.NewTicker(200 * time.Millisecond) //200ms进行一次刷新
defer ticker.Stop()
var br bufRows
brLastResetTime := fasttime.UnixTimestamp()
var waitCh <-chan struct{}
mustStop := false
for !mustStop { //一直循环至程序停止
sn.brLock.Lock()
bufLen := len(sn.br.buf) //加锁判断buf的长度
sn.brLock.Unlock()
waitCh = nil
if bufLen > 0 {
// Do not sleep if sn.br.buf isn't empty.
waitCh = closedCh //如果buf不为空则直接进行flush
}
select {
case <-stopCh:
mustStop = true //停止程序
// Make sure the sn.buf is flushed last time before returning
// in order to send the remaining bits of data.
case <-ticker.C: //200ms进行flush
case <-waitCh: //直接进行数据flush
}
sn.brLock.Lock()
sn.br, br = br, sn.br //加锁获取数据并复制一个新的br进去
sn.brLock.Unlock()
currentTime := fasttime.UnixTimestamp()
if len(br.buf) < cap(br.buf)/4 && currentTime-brLastResetTime > 10 { //对于数据量较小的情况该次不进行传输数据,保存至下一次数据传递
// Free up capacity space occupied by br.buf in order to reduce memory usage after spikes.
br.buf = append(br.buf[:0:0], br.buf...)
brLastResetTime = currentTime
}
sn.checkHealth()
if len(br.buf) == 0 {
// Nothing to send.
continue
}
// Send br to replicas storageNodes starting from snIdx.
for !sendBufToReplicasNonblocking(&br, snIdx, replicas) { //发送数据至SN里面
t := timerpool.Get(200 * time.Millisecond) //重新设置timer
select {
case <-stopCh:
timerpool.Put(t)
return
case <-t.C:
timerpool.Put(t)
sn.checkHealth()
}
}
br.reset() //清空buf
}
func sendBufToReplicasNonblocking(br *bufRows, snIdx, replicas int) bool {
usedStorageNodes := make(map[*storageNode]bool, replicas) //使用map来标记该SN是否已经传输过数据,保证一个SN只会存储一个副本
for i := 0; i < replicas; i++ { //对每个副本找到一个SN来进行数据存储
idx := snIdx + i //初始使用该SN的下i个SN作为数据存储节点
attempts := 0
for {
attempts++
if attempts > len(storageNodes) { //遍历完所有的SN,还没有找到可以适应的SN报错
if i == 0 { //如果一个副本都没有存储成功报错,如果有一个副本成功了则输出Warn
// The data wasn't replicated at all.
logger.Warnf("cannot push %d bytes with %d rows to storage nodes, since all the nodes are temporarily unavailable; "+
"re-trying to send the data soon", len(br.buf), br.rows)
return false
}
// The data is partially replicated, so just emit a warning and return true.
// We could retry sending the data again, but this may result in uncontrolled duplicate data.
// So it is better returning true.
rowsIncompletelyReplicatedTotal.Add(br.rows)
logger.Warnf("cannot make a copy #%d out of %d copies according to -replicationFactor=%d for %d bytes with %d rows, "+
"since a part of storage nodes is temporarily unavailable", i+1, replicas, *replicationFactor, len(br.buf), br.rows)
return true
}
if idx >= len(storageNodes) {
idx %= len(storageNodes) //如果idx大于SN,则设置
}
sn := storageNodes[idx]
idx++
if usedStorageNodes[sn] { //该SN已经存储过,查找下一个
// The br has been already replicated to sn. Skip it.
continue
}
if !sn.sendBufRowsNonblocking(br) { //无法发送数据,下一个
// Cannot send data to sn. Go to the next sn.
continue
}
// Successfully sent data to sn.
usedStorageNodes[sn] = true //发送成功标记SN已经发送了
break
}
}
return true
}
}
复制代码
netstorage的初始化主要是通过解析--storageNode的参数,获取storageNode的地址。对每个address启动一个协程,运行run函数。run函数中主要是通过遍历channel获取数据,然后通过sendBufToReplicasNonblocking函数进行数据发送。sendBufToReplicasNonblocking中会对副本等策略进行判断,目前VM使用的策略是设置一个HashMap,然后每设置一个副本,就把该节点的HashValue设置为True,这样可以保证一个机器只存储一个副本的数据。
UnMarshalWorker Init
func StartUnmarshalWorkers() {
if unmarshalWorkCh != nil {
logger.Panicf("BUG: it looks like startUnmarshalWorkers() has been alread called without stopUnmarshalWorkers()")
}
gomaxprocs := cgroup.AvailableCPUs() //获取可以使用的CPU数
unmarshalWorkCh = make(chan UnmarshalWork, gomaxprocs)
unmarshalWorkersWG.Add(gomaxprocs)
for i := 0; i < gomaxprocs; i++ { //每个CPU开启一个worker去进行Sink
go func() {
defer unmarshalWorkersWG.Done()
for uw := range unmarshalWorkCh {
uw.Unmarshal()
}
}()
}
}
func ScheduleUnmarshalWork(uw UnmarshalWork) { //将uw放入chan中等待调度
unmarshalWorkCh <- uw
}
复制代码
StartUnmarshalWorkers的功能主要是根据cgroup的参数获取可以使用的CPU数量,然后每个CPU开启一个协程去进行Unmarshal的操作。Vminsert的主要工作就是对数据进行转化,把收集到的数据转换为VmStorage可以使用的数据。
ScheduleUnmarshalWork是获取数据之后会调用的函数,主要功能就是把uw放置在调度队列里面等待uw被调度并执行。
Server部分
该部分逻辑基本类似,主要的区别在于对于每种数据的解析方式和流程不同,大致使用的函数名都是类似的,该部分以VmInsert的作为一个示例来进行解释。
func InsertHandler(c net.Conn) error {
bc, err := handshake.VMInsertServer(c, 0)
if err != nil {
return fmt.Errorf("cannot perform vminsert handshake with client %q: %w", c.RemoteAddr(), err)
}
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(bc, func(rows []storage.MetricRow) error {
return insertRows(rows) //插入rows数据
})
})
}
func getUnmarshalWork() *unmarshalWork {
v := unmarshalWorkPool.Get()
if v == nil {
return &unmarshalWork{}
}
return v.(*unmarshalWork)
}
func putUnmarshalWork(uw *unmarshalWork) {
uw.reset()
unmarshalWorkPool.Put(uw)
}
var unmarshalWorkPool sync.Pool
func ParseStream(bc *handshake.BufferedConn, callback func(rows []storage.MetricRow) error) error {
var wg sync.WaitGroup
var (
callbackErrLock sync.Mutex
callbackErr error
)
for {
uw := getUnmarshalWork() //从 sync.Pool中获取uw
uw.callback = func(rows []storage.MetricRow) { //设置uw的回调函数
if err := callback(rows); err != nil {
processErrors.Inc()
callbackErrLock.Lock()
if callbackErr == nil {
callbackErr = fmt.Errorf("error when processing native block: %w", err)
}
callbackErrLock.Unlock()
}
}
uw.wg = &wg
var err error
uw.reqBuf, err = readBlock(uw.reqBuf[:0], bc) //从网络连接中获取一个block的数据
if err != nil {
wg.Wait()
if err == io.EOF {
// Remote end gracefully closed the connection.
putUnmarshalWork(uw) //如果读取失败,将uw放在sync.Pool中供下次使用
return nil
}
return fmt.Errorf("cannot read packet size: %w", err)
}
blocksRead.Inc()
wg.Add(1)
common.ScheduleUnmarshalWork(uw) //将uw送至uw的调度中,进行调度
}
}
// Unmarshal implements common.UnmarshalWork
func (uw *unmarshalWork) Unmarshal() {
defer uw.wg.Done()
if err := uw.unmarshal(); err != nil {
parseErrors.Inc()
logger.Errorf("error when unmarshaling clusternative block: %s", err)
putUnmarshalWork(uw)
return
}
mrs := uw.mrs
for len(mrs) > maxRowsPerCallback {
// Limit the number of rows passed to callback in order to reduce memory usage
// when processing big packets of rows.
uw.callback(mrs[:maxRowsPerCallback])
mrs = mrs[maxRowsPerCallback:]
}
uw.callback(mrs)
putUnmarshalWork(uw)
}
func insertRows(rows []storage.MetricRow) error {
ctx := netstorage.GetInsertCtx() //获取InsertCtx,有个pool存储Ctx.用完会通过Put插入到pool里面
defer netstorage.PutInsertCtx(ctx)
ctx.Reset() // This line is required for initializing ctx internals. 初始化Ctx,把上一次的Ctx的信息清除掉
hasRelabeling := relabel.HasRelabeling()
var at auth.Token
var rowsPerTenant *metrics.Counter
var mn storage.MetricName
for i := range rows {
mr := &rows[i]
if err := mn.UnmarshalRaw(mr.MetricNameRaw); err != nil { //通过编码后的数据得到MetricName的信息
return fmt.Errorf("cannot unmarshal MetricNameRaw: %w", err)
}
if rowsPerTenant == nil || mn.AccountID != at.AccountID || mn.ProjectID != at.ProjectID { //如果at的信息变换了,重新填充
at.AccountID = mn.AccountID
at.ProjectID = mn.ProjectID
rowsPerTenant = rowsTenantInserted.Get(&at)
}
ctx.Labels = ctx.Labels[:0]
ctx.AddLabelBytes(nil, mn.MetricGroup)
for j := range mn.Tags {
tag := &mn.Tags[j]
ctx.AddLabelBytes(tag.Key, tag.Value)
}
if hasRelabeling {
ctx.ApplyRelabeling() //没有太明白这块的操作是为什么
}
if len(ctx.Labels) == 0 {
// Skip metric without labels.
continue
}
ctx.SortLabelsIfNeeded()
if err := ctx.WriteDataPoint(&at, ctx.Labels, mr.Timestamp, mr.Value); err != nil { //写入数据
return err
}
rowsPerTenant.Inc()
}
rowsInserted.Add(len(rows))
rowsPerInsert.Update(float64(len(rows)))
return ctx.FlushBufs() //把剩余的数据推送到SN中
}
func (ctx *InsertCtx) WriteDataPoint(at *auth.Token, labels []prompb.Label, timestamp int64, value float64) error {
ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], at.AccountID, at.ProjectID, labels) //把Label,at相关的信息编码为[]byte,并打包在一起
storageNodeIdx := ctx.GetStorageNodeIdx(at, labels) //根据label的信息进行hash,找到对应的SN进行存储
return ctx.WriteDataPointExt(at, storageNodeIdx, ctx.MetricNameBuf, timestamp, value)
}
func (ctx *InsertCtx) GetStorageNodeIdx(at *auth.Token, labels []prompb.Label) int {
if len(storageNodes) == 1 {
// Fast path - only a single storage node.
return 0
}
buf := ctx.labelsBuf[:0]
buf = encoding.MarshalUint32(buf, at.AccountID)
buf = encoding.MarshalUint32(buf, at.ProjectID)
for i := range labels {
label := &labels[i]
buf = marshalBytesFast(buf, label.Name)
buf = marshalBytesFast(buf, label.Value)
}
h := xxhash.Sum64(buf) //生成Hash值
ctx.labelsBuf = buf
idx := int(jump.Hash(h, int32(len(storageNodes)))) //生成对应的SN的index
return idx
}
func (ctx *InsertCtx) WriteDataPointExt(at *auth.Token, storageNodeIdx int, metricNameRaw []byte, timestamp int64, value float64) error {
br := &ctx.bufRowss[storageNodeIdx]
sn := storageNodes[storageNodeIdx]
bufNew := storage.MarshalMetricRow(br.buf, metricNameRaw, timestamp, value) //把ts和value的信息打包进buf里面
if len(bufNew) >= maxBufSizePerStorageNode { //如果br中的现存数据过大,直接发送到SN中
// Send buf to storageNode, since it is too big.
if err := br.pushTo(sn); err != nil {
return err
}
br.buf = storage.MarshalMetricRow(bufNew[:0], metricNameRaw, timestamp, value) //清楚之前的数据,并把最近的一条数据设置到br中
} else {
br.buf = bufNew //数据没有达到上限,直接把数据赋值进去
}
br.rows++
return nil
}
复制代码
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END






















![[桜井宁宁]COS和泉纱雾超可爱写真福利集-一一网](https://www.proyy.com/skycj/data/images/2020-12-13/4d3cf227a85d7e79f5d6b4efb6bde3e8.jpg)

![[桜井宁宁] 爆乳奶牛少女cos写真-一一网](https://www.proyy.com/skycj/data/images/2020-12-13/d40483e126fcf567894e89c65eaca655.jpg)