Golang 的并发编程

Go语言中的并发编程,首先我们来了解一个概念

并发
同一时间段内执行多个任务

并行
同一时刻执行多个任务

Go语言的并发通过goroutine实现。goroutine类似于线程,属于用户态的线程,我们可以根据需要创建成千上万个goroutine并发工作。goroutine是由Go语言的运行时(runtime)调度完成,而线程是由操作系统调度完成。

Go语言还提供channel在多个goroutine间进行通信。goroutine和channel是 Go 语言秉承的 CSP(Communicating Sequential Process)并发模式的重要实现基础。

接下来我们来看看如何使用goroutine

func Hello(){
   fmt.Println(“start”)
}

func main(){
    go Hello()
    fmt.Println("end")
}

复制代码

ok,这就是简单的goroutine的使用,那上面的代码到底是什么意思呢,我们一步步来看
首先goroutine的使用 就是给函数 前面加上go关键字,就启动了一个gonroutine。

首先 main 函数 可以理解是一个主的goroutine 然后在调用Hello方法 加上go 关键字,开启了一个goroutine 执行hello函数。

那么以上代码的执行结果是什么呢,有可能是,只打印了end,注意 现在这个代码不是串行 而是开启了goroutine。那么需要注意的是,当main 运行结束退出时,所有派发的goroutine也就销毁了 无法继续执行,所有我们需要 让main 函数等待所有的goroutine结束之后 在退出main 函数

这里我们用一个工具包 sync.waitGroup

//WaitGroup是一个结构体
var wg sync.WaitGroup

func Hello(i int){
	fmt.Println(i)
	wg.Done() //这里就是用到了 wg.Done()通知wg我运行完了 计数器-1
}

//开启了一个主goroutine执行main 函数
func main() {
	wg.Add(10000) //计数器 新增一个goroutine 就+1 我们这里开启了10000个goroutine 所以就+10000 ,
	//开启了一个goroutine 执行hello函数
	for i:=0;i<10000;i++{
	  go Hello(i)
	}
	fmt.Println("end")
	wg.Wait() //Wait 阻塞状态 等待所有的goroutine 跑完之后,wg.Add 里面为0 的时候

}
复制代码

多次执行上面的代码,会发现每次打印的数字的顺序都不一致。这是因为10个goroutine是并发执行的,而goroutine的调度是随机的。

goroutine执行匿名函数

var wg sync.WaitGroup

func main() {
	wg.Add(10000)
	for i:=0;i<10000;i++{
		go func(i int){
 			fmt.Println(i)
 			wg.Done()
		}(i)
	}
	wg.Wait()
}
复制代码

需要注意的是 此时我们goroutine遇上了一个闭包,所以我们不能直接引用函数外部变量,需要传参数,把外部的i 传进去

goroutine与线程的关系

操作系统的线程都有固定的栈内存,一般大小在2M左右,而goroutine的栈在生命周期初始大小只有2kb,goroutine的大小是可以随之增加的,理论上大小可达1G,但是一般情况不会到达这么大的。

goroutine调度

GPM是Go语言运行时(runtime)层面的实现,是go语言自己实现的一套调度系统。区别于操作系统调度OS线程 具体的概念大家可以去看看

GOMAXPROCS

Go运行时的调度器使用GOMAXPROCS参数来确定需要使用多少个OS线程来同时执行Go代码。默认值是机器上的CPU核心数。例如在一个8核心的机器上,调度器会把Go代码同时调度到8个OS线程上(GOMAXPROCS是m:n调度中的n)。

Go语言中可以通过runtime.GOMAXPROCS()函数设置当前程序并发时占用的CPU逻辑核心数。

Go1.5版本之前,默认使用的是单核心执行。Go1.5版本之后,默认使用全部的CPU逻辑核心数。

一个操作系统线程对应用户态多个goroutine。go程序可以同时使用多个操作系统线程。goroutine和OS线程是多对多的关系。

chan

chan 用于goroutine之间的通信,go 提倡通过通信共享内存而不是通过共享内存而实现通信。
chan是一种通道,遵循先进先出的原则。

var ch1 = make(chan int) //无缓冲区
var ch2 = make(chan string,1) //待缓冲区
复制代码

chan是引用数据类型,必须初始化分配内存使用,chan又分为 无缓冲区和有缓冲区,有缓冲区的可指定缓存区域大小,能放多少数据。
无缓冲区 又称为同步通道,必须有一个goroutine 区接受值,否则会引发panic。 待缓冲区 又被称为异步通道。把数据放到管道里,不管你有没有人接受,我就去干别的事情了

chan的发送

var ch2 = make(chan string,1)
ch2 <- 10
复制代码

chan的接受

x := <-ch2
复制代码

关闭通道

close(ch2)
复制代码

chan是引用数据类型,不是每次都需要手动关闭,他会被垃圾机制回收的。

channel不需要通过close释放资源,只要没有goroutine持有channel,相关资源会自动释放。
close可以用来通知channel接收者不会再收到数据。所以即使channel中有数据也可以close而不会导致接收者收不到残留的数据。
有些场景需要手动关闭通道,例如range遍历通道,如不关闭range遍历会出现死锁。


func main() {
	var ch1 = make(chan int,1) 	//有缓冲区通道 有称为同步通道
	//var ch2 = make(chan int) //无缓冲区域,必须有一个goroutine从里面取值,异步通道
	ch1<-10
	x:= <-ch1
	fmt.Println(x)
	close(ch1)
}
复制代码

那么goroutine和channel 如何协同工作呢,请看下面的例子

/**
 两个goroutine
 两个chan
 1、生成0-100的数字发送到 ch1
 2、从ch1接受数据 计算平方 放到ch2
 */

//生成0-100的数字发送到 ch1
func a(c1 chan int){
	for i:=0;i<100;i++{
		c1 <- i
	}
	close(c1)
}

//从ch1接受数据 计算平方 放到ch2
func b(c1 chan int,c2 chan int){
	//循环取值
	for tem := range c1{
		c2 <- tem*tem
	}
	close(c2)

}

func main() {
	ch1 := make(chan int ,100)
	ch2 :=make(chan int,100)
	go a(ch1)
	go b(ch1,ch2)

	//从ch2中循环取值 所有没有完成之前主程序不会退出
	for num := range ch2{
		fmt.Println(num)
	}

}
复制代码

需要注意的是 以上chan使用了遍历 传递数据,所以需要手动关闭通道,不然会造成goroutine死锁

单项通道

单项通道值得是 一个通道只能够 接受值,或者发送值,如果没有指定 那么它既可以发送也可以接收值。
如何定义单项通道呢,请看一下代码

//生成0-100的数字发送到 ch1 只能接收
func a(c1 chan<- int){
	for i:=0;i<100;i++{
		c1 <- i
	}
	close(c1)
}

//从ch1接受数据 计算平方 放到ch2
//c1只能发送,c2 只能接收
func b(c1 <-chan int,c2 chan<- int){
	//循环取值
	for tem := range c1{
		c2 <- tem*tem
	}
	close(c2)

}

func main() {
	ch1 := make(chan int ,100)
	ch2 := make(chan int,100)
	go a(ch1)
	go b(ch1,ch2)

	//从ch2中循环取值 所有没有完成之前主程序不会退出
	for num := range ch2{
		fmt.Println(num)
	}

}

复制代码

或者也可以在申明时候定义

	//只能发送值
	var ch3 = make(<-chan int,1)
	//只能接收值
	var ch4 = make(chan<- int,1)
       
复制代码

channel常见的异常总结

image.png

(图片取自七米老师的博客)

并发安全和锁

在一个项目中可能会存在多个goroutine 同时去修改一个值,那么这个值就会发生竞争,结果与预期不符合。

var (
	a int
	wg sync.WaitGroup
)
func add(){
	for i:=0;i<5000;i++{
		a++
	}
	wg.Done()
}

func main() {
	wg.Add(2)
	go add()
	go add()
	wg.Wait()
	fmt.Println(a)
}

复制代码

以上代码 我执行了两个goroutine 并发区修改 a的值,如果顺利的话 那么值肯定是10000,但是结果并不是,我并发的去修改a的值 就造成了数据的竞争。那么如何解决这个问题呢,这里就用到了 go语言的锁了。

互斥锁


var (
	a int
	wg sync.WaitGroup
	lock sync.Mutex //互斥锁 一次只能有一个goroutine 来访问修改
)
func add(){
	for i:=0;i<5000;i++{
		lock.Lock() //加锁
		a++
		lock.Unlock()//解锁
	}
	wg.Done()
}

func main() {
	wg.Add(2)
	go add()
	go add()
	wg.Wait()
	fmt.Println(a)
}
复制代码

使用互斥锁能够保证同一时间有且只有一个goroutine进入临界区,其他的goroutine则在等待锁;当互斥锁释放后,等待的goroutine才可以获取锁进入临界区,多个goroutine同时等待一个锁时,唤醒的策略是随机的。

读写互斥锁


var (
	a int
	wg sync.WaitGroup
	lock sync.RWMutex
)

func read(){
	lock.RLock() //读锁
	time.Sleep(time.Millisecond) //假如读需要一毫秒
	lock.RUnlock() //解锁
	wg.Done()
}

func write(){
	lock.Lock() //加锁
	a = a+1
	lock.Unlock() //解锁
	wg.Done()
}

func main() {
	wg.Add(1000)
	for i:=0;i<1000;i++{
		go read()
	}
	wg.Add(100)
	for i:=0;i<100;i++ {
		go write()
	}
	wg.Wait()
	fmt.Println(a)
}

复制代码

读写互斥锁,是读写分离的,当我们读的次数远远大于写的次数,我们可以使用读写互斥锁,效率会比单纯的互斥锁高很多很多。

sync.WaitGroup

这个方法我们之前已经用到很多了,在代码中生硬的使用time.Sleep肯定是不合适的,Go语言中可以使用sync.WaitGroup来实现并发任务的同步。 sync.WaitGroup有以下几个方法:

(wg * WaitGroup)
(wg *WaitGroup) 计数器-1
(wg *WaitGroup) Wait() 阻塞直到计数器变为0

方法名 功能
Add(delta int) 计数器+delta
Done() 计数器-1
Wait() 阻塞直到计数器变为0

sync.WaitGroup内部维护着一个计数器,计数器的值可以增加和减少。例如当我们启动了N 个并发任务时,就将计数器值增加N。每个任务完成时通过调用Done()方法将计数器减1。通过调用Wait()来等待并发任务执行完,当计数器值为0时,表示所有并发任务已经完成。

sync.Once

在编程的很多场景下我们需要确保某些操作在高并发的场景下只执行一次,例如只加载一次配置文件、只关闭一次通道等。

var loadIconsOnce sync.Once
//里面只提供了一个Do方法

func main(){
   for i:=0;i<30i++{
        go func(){
        //只执行一次
        loadIconsOnce.Do(func() {
              fmt.Println(111)
            })
        }()
   }
}

复制代码

sync.Map

map 在高并发的情况下区设置更改并不是安全的,开启少量几个goroutine的时候可能没什么问题,当并发多了之后执行上面的代码就会报fatal error: concurrent map writes错误。

需要为map加锁来保证并发的安全性了,Go语言的sync包中提供了一个开箱即用的并发安全版map–sync.Map。开箱即用表示不用像内置的map一样使用make函数初始化就能直接使用。同时sync.Map内置了诸如Store、Load、LoadOrStore、Delete、Range等操作方法。


var (
	m sync.Map
	wg sync.WaitGroup
)

func set(i int){
	m.Store(i,i) //给map 设置值
	wg.Done()
}

func main() {
	wg.Add(30)
	for i:= 0;i<30;i++ {
		go set(i)
	}
	wg.Wait()
	//遍历map
	m.Range(func(key, value interface{}) bool {
		fmt.Println(key,value)
		return true
	})
	//删除 key = 1 的
	m.Delete(1)

	//获取
	m.Load(1)

}

复制代码

sync.Map 没有提供获取 map 数量的方法,替代方法是在获取 sync.Map 时遍历自行计算数量,sync.Map 为了保证并发安全有一些性能损失,因此在非并发情况下,使用 map 相比使用 sync.Map 会有更好的性能。

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享