influxdb源码解析(tsdb模块)

相关结构体

  • Shard
// 按时间将数据分片。一个shard处理一个时间段所有的读取、写入
type Shard struct {
    // 一个Shard包含一个tsm engine、一个tsi index
   _engine Engine
   index   Index
复制代码
  • Engine
type Engine struct {
    // tsi index
    index tsdb.Index
    // seriesFile
    sfile *tsdb.SeriesFile
    // 记录写、删除操作,用于重启后恢复Cache
    WAL   *WAL
    // 快速执行写、删除操作;快速响应热数据的查询
    Cache *Cache
    // 一个engine有多个tsm文件,每个文件对应一个TSMReader
    FileStore []TSMReader
}
复制代码
  • Cache
type Cache struct {
    // 存储实现
    store   storer
    // snapshot,只读的Cache,用于compaction。类似于lsm的immutable memory
    snapshot *Cache
}

存储实现
type ring struct {
    // 由于读写需要加锁。通过分区可减小锁的范围
    partitions []*partition
}

type partition struct {
    // 只是一个简单的map,key是series key,value是(time, value)数组
    store map[string]*entry
}
复制代码
  • TSMReader
type TSMReader struct {
    // 通过mmap从tsm文件中读取points
    accessor blockAccessor
    // tsm index,用来优化accessor读取;保存tsm的tombstone
    index TSMIndex
    // 持久化tsm的删除操作
    tombstoner *Tombstoner
}

// tsm index的实现
type indirectIndex struct {
    // series key的索引
    offsets []byte
    // 存储tsm的tombstone
    tombstones map[string][]TimeRange
}

type Tombstoner struct {
     // 存储tombstone的文件,实际也是wal文件。influxdb启动的时候tsm index会解析这个文件,得到tombstone并保存到内存中,用于响应查询请求时过滤tsm文件被删除的数据
     pendingFile       *os.File
}
复制代码
  • Index
// tsi index
type Index struct {
    // 通过分区减小锁范围
    partitions []*Partition
}

type Partition struct {
    // 一个logFile
    logFile *LogFile
    // 多个tsi文件
    indexFiles []IndexFile
}

// 负责tsi的WAL和内存操作
type LogFile struct {
    // WAL文件
    file *os.File 
    // 分别记录tsi的添加、删除series操作
    seriesIDSet, tombstoneSeriesIDSet *tsdb.SeriesIDSet
    // 保存measurement、tag key、tag value的倒排索引。WAL大小达到阈值时compact到IndexFile
    mms map[string]*logMeasurement
}

// tsi持久化文件
type IndexFile struct {
    // 通过mmap读取tsi index文件并解析得到的索引
    // key为measurement,value为指向mmap映射数据的指针,所以不占内存
    tblks map[string]*TagBlock

    // 对应logFile的tombstoneSeriesIDSet
    // 为什么需要持久化tombstoneSeriesID?
    // 因为logFile中存储了tombstoneSeriesIDSet,而logFile文件compact到indexFile时并不会apply这些tombstone(减小写的放大效应),必须先存储tombstone到indexFile中,并等待更高级别的compaction
    tombstoneSeriesIDSetData []byte
}
复制代码

写入数据

  • 获取shardInfo(points_writer.go)
// 根据point的时间戳找到对应的shardInfo
func (l sgList) ShardGroupAt(t time.Time) *meta.ShardGroupInfo {}
复制代码
  • 写入Store(store.go)
func (s *Store) WriteToShard(shardID uint64, points []models.Point) error {
    // 根据shardInfo中的shardID获取shard
    sh := s.shards[shardID]
    // 写入shard
    sh.WritePoints(points)
}
复制代码
  • 写入Shard(shard.go)
func (s *Shard) WritePoints(points []models.Point) error {
    // 如果point的序列是新序列,先创建序列
    points, fieldsToCreate, err := s.validateSeriesAndFields(points)
    // 写入engine
    engine.WritePoints(points)
}
复制代码
  • 写入Engine(engine.go)
func (e *Engine) WritePoints(points []models.Point) error {
    // 写Cache
    e.Cache.WriteMulti(values)
    // 写WAL
    e.WAL.WriteMulti(values)
}
复制代码

分析

  • 为什么根据时间戳获取shard

由于时序数据的读写特点,influxdb把时间作为第一层索引。时间分区(shard)后,每个shard负责某一时间段的读写,同时每个shard包含一套单独的tsm、tsi引擎来处理读写。所以可以把shard理解为influxdb的最小执行单元,一个执行任务的worker

  • 为什么只写入CacheWAL

influxdb引擎整体基于lsm算法,满足读少写多。在lsm中,所有写、更新操作都只发生在内存(Cache)和日志(WAL

  • 宕机后如何恢复Cache

通过WAL恢复。因为WAL顺序记录了所有写、更新操作,所以可以恢复到宕机前的状态,类似重放mysql binlog。见cache.go

func (cl *CacheLoader) Load(cache *Cache) error {
    switch t := entry.(type) {
    case *WriteWALEntry:
        // 重放写操作
	cache.WriteMulti(t.Values); 
    case *DeleteRangeWALEntry:
        // 重放删除操作
	cache.DeleteRange(t.Keys, t.Min, t.Max)
    case *DeleteWALEntry:
	cache.Delete(t.Keys)
    }
}
复制代码
  • 如何防止Cache过大?

达到设置的阈值后,influxdb会compact到一个新的tsm文件。见engine.go

func (e *Engine) writeSnapshotAndCommit(log *zap.Logger, closedFiles []string, snapshot *Cache) (err error) {}
复制代码

更新数据

influxdb通过覆盖已有数据达到更新的目的,比如在同一series下插入多个时间戳相同、值不同的point,查询结果只能看到最后插入的point

和lsm一样,influxdb处理读请求时,使用Cache中的数据覆盖tsm文件中的数据,即新的数据覆盖老数据,以达到更新的目的。见iterator.gen.go

func (c *floatAscendingCursor) nextFloat() (int64, float64) {
    // 对于某一个series,从cache、tsm获取point
    ckey, cvalue := c.peekCache()
    tkey, tvalue := c.peekTSM()
    
    // 如果时间戳相同,代表这个点有更新操作,返回cache中的点
    if ckey == tkey {
        c.nextCache()
        c.nextTSM()
        return ckey, cvalue
    }
    ...
}

复制代码

删除数据

func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
    // 遍历每一个tsm文件
    e.FileStore.Apply(func(r TSMReader) error {
        r.BatchDelete().DeleteRange([][]byte{indexKey}, min, max)
        ......
        // 将tombstone文件移动到合法路径,用于recover tsm index和compaction
        // 将删除操作应用到tsm index中,便于处理查询请求时过滤数据
        r.BatchDelete().commit()
    }
    
    // 删除内存里的points,真正删除,而不是加tombstone
    e.Cache.DeleteRange(deleteKeys, min, max)
    // 记录删除操作
    e.WAL.DeleteRange(deleteKeys, min, max)
    
    // 再次遍历tsm文件,找到被彻底删除的series,以便删除tsi的series
    e.FileStore.Apply(func(r TSMFile) error {
        // 比较第一次遍历tsm文件前后的差异,即可得出被彻底删除的series
    }
    
    // 删除tsi的series
    for _, k := range seriesKeys {
        // 如果Cache还有数据,则不能删除tsi索引
        if e.Cache.Values(k).Len() > 0 {
            continue
        }
        
        // 删除tsi的series
        // 更新index中的partition中的logFile的tombstoneSeriesIDSet,并记录到WAL
        e.index.DropSeries(k)
    }
}

func (b *batchDelete) DeleteRange(keys [][]byte, minTime, maxTime int64) error {
    // tsm index保存了tsm文件的seriesKey最大、最小值,不在此区间,直接跳过
    if !b.r.index.OverlapsKeyRange(minKey, maxKey) {
            return nil
    }

    // 同理判断时间区间
    if !b.r.index.OverlapsTimeRange(minTime, maxTime) {
            return nil
    }
    
    // 写入临时文件中,等待提交。提交后,文件被移动到合法路径,下次启动时tsm index会加载tombstone到内存
    b.r.tombstoner.AddRange(keys, minTime, maxTime)
}
复制代码

更详细的解析见github.com/hzw-go/infl…

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享