相关结构体
- Shard
// 按时间将数据分片。一个shard处理一个时间段所有的读取、写入
type Shard struct {
// 一个Shard包含一个tsm engine、一个tsi index
_engine Engine
index Index
复制代码
- Engine
type Engine struct {
// tsi index
index tsdb.Index
// seriesFile
sfile *tsdb.SeriesFile
// 记录写、删除操作,用于重启后恢复Cache
WAL *WAL
// 快速执行写、删除操作;快速响应热数据的查询
Cache *Cache
// 一个engine有多个tsm文件,每个文件对应一个TSMReader
FileStore []TSMReader
}
复制代码
- Cache
type Cache struct {
// 存储实现
store storer
// snapshot,只读的Cache,用于compaction。类似于lsm的immutable memory
snapshot *Cache
}
存储实现
type ring struct {
// 由于读写需要加锁。通过分区可减小锁的范围
partitions []*partition
}
type partition struct {
// 只是一个简单的map,key是series key,value是(time, value)数组
store map[string]*entry
}
复制代码
- TSMReader
type TSMReader struct {
// 通过mmap从tsm文件中读取points
accessor blockAccessor
// tsm index,用来优化accessor读取;保存tsm的tombstone
index TSMIndex
// 持久化tsm的删除操作
tombstoner *Tombstoner
}
// tsm index的实现
type indirectIndex struct {
// series key的索引
offsets []byte
// 存储tsm的tombstone
tombstones map[string][]TimeRange
}
type Tombstoner struct {
// 存储tombstone的文件,实际也是wal文件。influxdb启动的时候tsm index会解析这个文件,得到tombstone并保存到内存中,用于响应查询请求时过滤tsm文件被删除的数据
pendingFile *os.File
}
复制代码
- Index
// tsi index
type Index struct {
// 通过分区减小锁范围
partitions []*Partition
}
type Partition struct {
// 一个logFile
logFile *LogFile
// 多个tsi文件
indexFiles []IndexFile
}
// 负责tsi的WAL和内存操作
type LogFile struct {
// WAL文件
file *os.File
// 分别记录tsi的添加、删除series操作
seriesIDSet, tombstoneSeriesIDSet *tsdb.SeriesIDSet
// 保存measurement、tag key、tag value的倒排索引。WAL大小达到阈值时compact到IndexFile
mms map[string]*logMeasurement
}
// tsi持久化文件
type IndexFile struct {
// 通过mmap读取tsi index文件并解析得到的索引
// key为measurement,value为指向mmap映射数据的指针,所以不占内存
tblks map[string]*TagBlock
// 对应logFile的tombstoneSeriesIDSet
// 为什么需要持久化tombstoneSeriesID?
// 因为logFile中存储了tombstoneSeriesIDSet,而logFile文件compact到indexFile时并不会apply这些tombstone(减小写的放大效应),必须先存储tombstone到indexFile中,并等待更高级别的compaction
tombstoneSeriesIDSetData []byte
}
复制代码
写入数据
- 获取
shardInfo
(points_writer.go)
// 根据point的时间戳找到对应的shardInfo
func (l sgList) ShardGroupAt(t time.Time) *meta.ShardGroupInfo {}
复制代码
- 写入
Store
(store.go)
func (s *Store) WriteToShard(shardID uint64, points []models.Point) error {
// 根据shardInfo中的shardID获取shard
sh := s.shards[shardID]
// 写入shard
sh.WritePoints(points)
}
复制代码
- 写入
Shard
(shard.go)
func (s *Shard) WritePoints(points []models.Point) error {
// 如果point的序列是新序列,先创建序列
points, fieldsToCreate, err := s.validateSeriesAndFields(points)
// 写入engine
engine.WritePoints(points)
}
复制代码
- 写入
Engine
(engine.go)
func (e *Engine) WritePoints(points []models.Point) error {
// 写Cache
e.Cache.WriteMulti(values)
// 写WAL
e.WAL.WriteMulti(values)
}
复制代码
分析
- 为什么根据时间戳获取
shard
?
由于时序数据的读写特点,influxdb把时间作为第一层索引。时间分区(shard)后,每个shard负责某一时间段的读写,同时每个shard包含一套单独的tsm、tsi引擎来处理读写。所以可以把shard理解为influxdb的最小执行单元,一个执行任务的worker
- 为什么只写入
Cache
和WAL
?
influxdb引擎整体基于lsm算法,满足读少写多。在lsm中,所有写、更新操作都只发生在内存(Cache
)和日志(WAL
)
- 宕机后如何恢复
Cache
通过WAL
恢复。因为WAL
顺序记录了所有写、更新操作,所以可以恢复到宕机前的状态,类似重放mysql binlog。见cache.go
func (cl *CacheLoader) Load(cache *Cache) error {
switch t := entry.(type) {
case *WriteWALEntry:
// 重放写操作
cache.WriteMulti(t.Values);
case *DeleteRangeWALEntry:
// 重放删除操作
cache.DeleteRange(t.Keys, t.Min, t.Max)
case *DeleteWALEntry:
cache.Delete(t.Keys)
}
}
复制代码
- 如何防止
Cache
过大?
达到设置的阈值后,influxdb会compact
到一个新的tsm文件。见engine.go
func (e *Engine) writeSnapshotAndCommit(log *zap.Logger, closedFiles []string, snapshot *Cache) (err error) {}
复制代码
更新数据
influxdb通过覆盖已有数据达到更新的目的,比如在同一series下插入多个时间戳相同、值不同的point,查询结果只能看到最后插入的point
和lsm一样,influxdb处理读请求时,使用Cache中的数据覆盖tsm文件中的数据,即新的数据覆盖老数据,以达到更新的目的。见iterator.gen.go
func (c *floatAscendingCursor) nextFloat() (int64, float64) {
// 对于某一个series,从cache、tsm获取point
ckey, cvalue := c.peekCache()
tkey, tvalue := c.peekTSM()
// 如果时间戳相同,代表这个点有更新操作,返回cache中的点
if ckey == tkey {
c.nextCache()
c.nextTSM()
return ckey, cvalue
}
...
}
复制代码
删除数据
func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
// 遍历每一个tsm文件
e.FileStore.Apply(func(r TSMReader) error {
r.BatchDelete().DeleteRange([][]byte{indexKey}, min, max)
......
// 将tombstone文件移动到合法路径,用于recover tsm index和compaction
// 将删除操作应用到tsm index中,便于处理查询请求时过滤数据
r.BatchDelete().commit()
}
// 删除内存里的points,真正删除,而不是加tombstone
e.Cache.DeleteRange(deleteKeys, min, max)
// 记录删除操作
e.WAL.DeleteRange(deleteKeys, min, max)
// 再次遍历tsm文件,找到被彻底删除的series,以便删除tsi的series
e.FileStore.Apply(func(r TSMFile) error {
// 比较第一次遍历tsm文件前后的差异,即可得出被彻底删除的series
}
// 删除tsi的series
for _, k := range seriesKeys {
// 如果Cache还有数据,则不能删除tsi索引
if e.Cache.Values(k).Len() > 0 {
continue
}
// 删除tsi的series
// 更新index中的partition中的logFile的tombstoneSeriesIDSet,并记录到WAL
e.index.DropSeries(k)
}
}
func (b *batchDelete) DeleteRange(keys [][]byte, minTime, maxTime int64) error {
// tsm index保存了tsm文件的seriesKey最大、最小值,不在此区间,直接跳过
if !b.r.index.OverlapsKeyRange(minKey, maxKey) {
return nil
}
// 同理判断时间区间
if !b.r.index.OverlapsTimeRange(minTime, maxTime) {
return nil
}
// 写入临时文件中,等待提交。提交后,文件被移动到合法路径,下次启动时tsm index会加载tombstone到内存
b.r.tombstoner.AddRange(keys, minTime, maxTime)
}
复制代码
更详细的解析见github.com/hzw-go/infl…
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END