Golang sync.map分享
一. sync.map引入
1.1 map读写的问题
多个协程对同一个map进行读写会报错
// map并发不加锁读写
func main() {
wg := sync.WaitGroup{}
m := make(map[int]int)
wg.Add(2)
go mapWrite(m, &wg)
go mapRead(m, &wg)
wg.Wait()
}
func mapWrite(m map[int]int, wg *sync.WaitGroup) {
for i := 0; i < 1000; i++ {
m[i] = i
}
wg.Done()
}
func mapRead(m map[int]int, wg *sync.WaitGroup) {
for i := 0; i < 1000000; i++ {
_ = m[i%1000]
}
wg.Done()
}
// 报错
// concurrent map read and map write(同时读写的报错)
// fatal error: concurrent map writes(同时写入的报错)
复制代码
1.2 解决方案
1.2.1 sync.map
golang内置结构体,能够实现map结构的并发读写
1.2.2 加锁
读写时加上互斥锁或者读写锁
1.2.3 读写分离
设置一个写入的map,一个读取的map,写入完成后将数据同步到读取map中(设置定时更新,实时性效果差)
var (
writeMap = make(map[int]int)
readMap = make(map[int]int)
readLock = sync.Mutex{}
)
func main() {
wg := sync.WaitGroup{}
wg.Add(2)
go doWrite(&wg)
go doRead(&wg)
wg.Wait()
}
func doWrite(wg *sync.WaitGroup) {
for i := 0; i < 1000; i++ {
writeMap[i] = i
}
writeCopy2Read()
wg.Done()
}
// 同步readMap
func writeCopy2Read() {
tempMap := make(map[int]int)
for key, val := range writeMap {
tempMap[key] = val
}
readLock.Lock()
readMap = tempMap
readLock.Unlock()
}
func getReadMap() map[int]int {
readLock.Lock()
defer readLock.Unlock()
return readMap
}
func doRead(wg *sync.WaitGroup) {
m := getReadMap()
for i := 0; i < 1000000; i++ {
_ = m[i%1000]
}
wg.Done()
}
复制代码
二. sync.map接口函数介绍
func (m *Map) Store(key, value interface{}) {} // 增(改)
func (m *Map) Delete(key interface{}) {} // 删
func (m *Map) LoadAndDelete(key interface{}) (value interface{}, loaded bool) {} // 加载并删除
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {} // 查
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) {} // 加载不存在则增加
// Range函数由使用者提供实现,Range将遍历调用时刻map中的所有k-v对,将它们传给f函数,如果f返回false,将停止遍历。实质上就是提供了遍历的方法
func (m *Map) Range(f func(key, value interface{}) bool) {}
复制代码
2.1 基本使用方法
func main() {
syncMap := sync.Map{}
// 增(改)
syncMap.Store(10, "增")
syncMap.Store(10, "改")
// 查
val, _ := syncMap.Load(10)
fmt.Println(val) // 改
// 删
syncMap.Delete(10)
// 加载,失败则进行存储
syncMap.LoadOrStore(20, "LoadOrStore")
val, _ = syncMap.Load(20)
fmt.Println(val) // LoadOrStore
// 遍历
syncMap.Store(10, "10")
syncMap.Store(20, "20")
syncMap.Store(30, "30")
syncMap.Range(rangeFunc)
}
func rangeFunc(key, value interface{}) bool {
fmt.Printf("key: %v\tval:%v\n", key, value)
return true
}
复制代码
三. sync.map源码分析
3.1 基本数据结构
type Map struct {
mu Mutex // 对Map.dirty进行保护
read atomic.Value // readOnly
dirty map[interface{}]*entry
misses int // 计数作用。每次从read中读取失败,计数+1
}
type readOnly struct {
m map[interface{}]*entry
amended bool // 当这里的m和Map.dirty中数据不一致时,该字段为true
}
type entry struct {
p unsafe.Pointer // *interface{}
}
var expunged = unsafe.Pointer(new(interface{}))
复制代码
read.m加载key时entry中P的三种状态分析–(标志着Map中的read和dirty中键值存储状态),通过p的状态来决定key对应的value应该进行怎样的处理,记住p的状态对应出现的场景很重要。一下对p的三种状态进行说明:
- p为正常的值,指向实际的value(即interface{})的地址。
- p==nil,则dirty==nil或者dirty[key]=nil(这个存在是因为删除时给read.m的key赋值为nil,而dirty和read.m共用一个entry)
- p==expunged(删除状态),则dirty!=nil且dirty中不包含该key
sync.map内存布局图如图1所示(黑色箭头为自身存储,红色箭头为指针):
图1. sync.Map内存布局图
sync.map内部使用读写分离的方法,可以理解为写入时向dirty中写入,读取的时候在read.m中进行读取(read.m∈dirty),简单的操作数据流示意图如图2所示更方便大家理解:
图2. sync.Map操作数据流示意图
注:图中的两个entry可能相同(dirty和read.m共用entry)
3.2 接口函数源码剖析
3.2.1 查询
// 查
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
// 读取read.m判断是否读取成功
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
// read.m没有读取成功,但read.amended标志位为true,表明dirty中有read.m中不包含的键值
if !ok && read.amended {
// 加锁并进行再次读取判断,防止在加锁前有故事发生
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
// 读取dirty
e, ok = m.dirty[key]
// 更新读取read.m失败计数,并判断是否进行dirty->read.m的迁移
m.missLocked()
}
m.mu.Unlock()
}
// dirty中也没有对应的键值
if !ok {
return nil, false
}
// 成功直接返回value
return e.load()
}
func (e *entry) load() (value interface{}, ok bool) {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
return *(*interface{})(p), true
}
// 更新读取read.m失败计数,并判断是否进行dirty->read.m的迁移
// 阈值为失败计数的次数与len(m.dirty)之间的大小关系判断
func (m *Map) missLocked() {
m.misses++
// 判断是否迁移的阈值就是读取read.m失败的次数与len(m.dirty)之间的比较
if m.misses < len(m.dirty) {
return
}
m.read.Store(readOnly{m: m.dirty})
m.dirty = nil // 虽然这里赋值为nil,但仍然是dirty包含read.m的关系
m.misses = 0
}
复制代码
sync.Map查询流程图如图3所示:
图3. sync.Map查询流程图
3.2.2 删除
func (m *Map) Delete(key interface{}) {
m.LoadAndDelete(key)
}
func (m *Map) LoadAndDelete(key interface{}) (value interface{}, loaded bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
// 双重判断
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
// read.m中没有key且read.amended标志位true,表明dirty中有read.m中不包含数据
e, ok = m.dirty[key]
delete(m.dirty, key) // 内建函数删除dirty中的key
m.missLocked() // 见上文所述
}
m.mu.Unlock()
}
// read.m读取成功,更新对应key的实质指针p的状态
if ok {
return e.delete()
}
return nil, false
}
func (e *entry) delete() (value interface{}, ok bool) {
for {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
// CAS
if atomic.CompareAndSwapPointer(&e.p, p, nil) {
return *(*interface{})(p), true
}
}
}
复制代码
sync.Map删除流程图如图4所示:从图中可以看出,sync.Map的删除流程和查询流程基本相似,区别就是中间的读取dirty对应key值对应数据变为了删除dirty中对应key。
图4. sync.Map删除流程图
3.2.3 增/改
func (m *Map) Store(key, value interface{}) {
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.tryStore(&value) {
//dirty中key没有被delete掉
return
}
m.mu.Lock()
// 双重判断
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
// read.m中存在key
if e.unexpungeLocked() {
// read.m存在key但是p==expunge(标记为删除状态),需要将key-value添加到dirty中
m.dirty[key] = e
}
// 1.原本p==expunge,在上述判断中更新了dirty,需要对read.m进行更新,p指向value
// 2.原本p==nil,则根据p的状态可知dirty==nil,直接更新read.m
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok {
// read.m中不存在key但是dirty中存在key,直接更新
e.storeLocked(&value)
} else {
// read.m和dirty都不存在该key
if !read.amended {
// 完成read.m->dirty的迁移
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}
func (e *entry) tryStore(i *interface{}) bool {
for {
p := atomic.LoadPointer(&e.p)
// 为了保证read.m∈ditry,所以当p==expunged时不能进行赋值
if p == expunged {
return false
}
if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
return true
}
}
}
func (e *entry) unexpungeLocked() (wasExpunged bool) {
return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}
func (e *entry) storeLocked(i *interface{}) {
atomic.StorePointer(&e.p, unsafe.Pointer(i))
}
func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}
read, _ := m.read.Load().(readOnly)
m.dirty = make(map[interface{}]*entry, len(read.m))
for k, e := range read.m {
if !e.tryExpungeLocked() { //只迁正常值的元素
// 1.只将read.m中key对应实质指针p状态为正常状态的值进行迁移
// 2.read.m中key对应实质指针p==nil则不会进行迁移并将p变更为expunge状态
// 3.read.m中key对应实质指针p==expunge则不会进行迁移且状态不变
m.dirty[k] = e
}
}
}
func (e *entry) tryExpungeLocked() (isExpunged bool) {
p := atomic.LoadPointer(&e.p)
for p == nil {
if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
return true
}
p = atomic.LoadPointer(&e.p)
}
return p == expunged
}
func newEntry(i interface{}) *entry {
return &entry{p: unsafe.Pointer(&i)}
}
复制代码
sync.Map增/改的流程图如图5所示:
图5. sync.Map增(改)流程图
3.2.4 尝试读取如果没有则存储
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) {
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
actual, loaded, ok := e.tryLoadOrStore(value)
if ok {
return actual, loaded
}
}
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
// 见上文,判断p是否是删除状态,如果是则变更为nil状态
if e.unexpungeLocked() {
// 如果p==expunge,证明dirty!=nil且不包含key,需要向dirty中添加key以及对应的value
m.dirty[key] = e
}
actual, loaded, _ = e.tryLoadOrStore(value)
} else if e, ok := m.dirty[key]; ok {
// read.m不包含key但dirty中包含key,更新value值
actual, loaded, _ = e.tryLoadOrStore(value)
// 见上文,记录读取read.m失败次数判断是否需要进行迁移
m.missLocked()
} else {
// read.m和dirty中都不包含key,进行新增
if !read.amended {
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
actual, loaded = value, false
}
m.mu.Unlock()
return actual, loaded
}
func (e *entry) tryLoadOrStore(i interface{}) (actual interface{}, loaded, ok bool) {
p := atomic.LoadPointer(&e.p)
if p == expunged {
return nil, false, false
}
if p != nil {
return *(*interface{})(p), true, true
}
ic := i
for {
if atomic.CompareAndSwapPointer(&e.p, nil, unsafe.Pointer(&ic)) {
return i, false, true
}
p = atomic.LoadPointer(&e.p)
if p == expunged {
return nil, false, false
}
if p != nil {
return *(*interface{})(p), true, true
}
}
}
复制代码
3.2.5 遍历
func (m *Map) Range(f func(key, value interface{}) bool) {
read, _ := m.read.Load().(readOnly)
// 如果read.amended为true表明dirty中有read.m中不包含数据,首先将dirty同步到read.m中
if read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if read.amended {
read = readOnly{m: m.dirty}
m.read.Store(read)
m.dirty = nil
m.misses = 0
}
m.mu.Unlock()
}
for k, e := range read.m {
v, ok := e.load()
if !ok {
continue
}
if !f(k, v) {
break
}
}
}
复制代码
四. 多种并发方法优缺点及性能对比
var (
m = make(map[int]int)
mProWrite = make(map[int]int)
mProRead = make(map[int]int)
syncM = sync.Map{}
lock = sync.Mutex{}
wg = sync.WaitGroup{}
)
const (
writeTimes = 1000
readTimes = 1000
)
// =================================Map=================================
func BenchmarkMap(b *testing.B) {
for i := 0; i < b.N; i++ {
mapTest()
}
}
func mapTest() {
//wg.Add(1)
//writeMap()
wg.Add(1)
readMap()
wg.Wait()
}
func readMap() {
defer wg.Done()
for i := 0; i < readTimes; i++ {
lock.Lock()
_ = m[i]
lock.Unlock()
}
}
func writeMap() {
defer wg.Done()
for i := 0; i < writeTimes; i++ {
lock.Lock()
m[i] = i
lock.Unlock()
}
}
// =================================Map=================================
// =================================sync.Map=================================
func BenchmarkSyncMap(b *testing.B) {
for i := 0; i < b.N; i++ {
syncMapTest()
}
}
func syncMapTest() {
//wg.Add(1)
//writeSyncMap()
wg.Add(1)
readSyncMap()
wg.Wait()
}
func readSyncMap() {
defer wg.Done()
for i := 0; i < readTimes; i++ {
syncM.Load(i)
}
}
func writeSyncMap() {
defer wg.Done()
for i := 0; i < writeTimes; i++ {
syncM.Store(i, i)
}
}
// =================================sync.Map=================================
// =================================Map读写分离=================================
func BenchmarkMapPro(b *testing.B) {
for i := 0; i < b.N; i++ {
mapProTest()
}
}
func mapProTest() {
//wg.Add(1)
//writeMapPro()
wg.Add(1)
readMapPro()
wg.Wait()
}
func writeMapPro() {
for i := 0; i < writeTimes; i++ {
mProWrite[i] = i
}
writeCopy2Read()
wg.Done()
}
func writeCopy2Read() {
tempMap := make(map[int]int)
for key, val := range mProWrite {
tempMap[key] = val
}
lock.Lock()
mProRead = tempMap
lock.Unlock()
}
func getReadMap() map[int]int {
lock.Lock()
defer lock.Unlock()
return mProRead
}
func readMapPro() {
m := getReadMap()
for i := 0; i < readTimes; i++ {
_ = m[i]
}
wg.Done()
}
// =================================Map读写分离=================================
复制代码
使用Benchmark进行性能测试(极限法,只写和只读)
- 只写
循环次数 | 每次循环消耗时间(ns) | |
---|---|---|
map加锁 | 51723 | 21077 |
sync_map | 9823 | 111275 |
map读写分离 | 10000 | 104086 |
- 只读
循环次数 | 每次循环消耗时间(ns) | |
---|---|---|
map加锁 | 85177 | 11988 |
sync_map | 111643 | 10295 |
map读写分离 | 625856 | 1786 |
五. 参考
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END