Go语言基础学习 (八) – 并发编程

前言

在极客上看了蔡超老师的Go语言课程 随手记下来的一些随笔,Go的基础应用及实例,
系列内容比较偏基础,推荐给想要入门Go语言开发者们阅读。

目录如下

Go语言基础学习 (一) – 变量 常量已经与其他语言的差异
Go语言基础学习 (二) -Go语言中的类型转与Go语言中的数组切片
Go语言基础学习 (三) – Go语言内的 Map声明使用与工厂模式
Go语言基础学习 (四) – Go语言函数简单介绍
Go语言基础学习 (五) – 面向对象编程
Go语言基础学习 (六) – 编写一个好的错误处理
Go语言基础学习 (七) – 包(package)
Go语言基础学习 (八) – 并发编程

1.携程机制

携程可以说是一个轻量级的线程

KSE:内核对象

image-20220225143319300.png

  • 多对多关系的图

    kernel entity(系统线程)是由CPU直接进行调度控制,这样的话,它的调度的效率是非常的高
    当时一对一关系时:如果你的线程之间发生切换时,它会牵扯到内核对象(KSE)的切换,这样就会比较高的消耗
    而多对多的关系时:我多个携程或者是多个线程都在一个线程空间里,或者都和一个内核对象(KSE)对应的话,那么他们之间的消耗就会小很多,Go的携程机制也就是这样子做的

    image-20220225143319310.png

    M : 系统线程
    P : 并不是真正意义上的处理器,而是Go语言实现的一个携程处理器
    G : 就是携程

    image-20220225143643989.png

    如图所示,P在M的下方,G就在P的下方,会生成多个G的队列,当有携程唤醒的时候,P就会依次来运行这些G队列

    问题1 : 如果G在运行中,其中有一个携程一直长时间被占用,导致后续携程都无法正常运行时,那要怎么办呢?

    回答:在Go起来的时候,它会有一个守护线程,它会去做一个计数,就回去记录每个P里G运行完成的数量, 当它发现某一个P完成的数量没有发生变化是,它就会往这个G的任务栈里面去插入一个特别的标记。当这个G运行的时候,遇到一个非内联函数的时候就会读到这个标记,就会把自己中断起来,再去插入到等待队列的队尾去,等待切换到别的G来进一步继续运行

    问题2 : 当某一个 携程/IO 被系统中断了,Go的携程机制会怎么处理并保持并发?

    答: 为了保证程序的并发,P会把自己移动到另一个可使用的系统线程当中。继续来执行它所挂的这些队列里的G,当被中断的G被唤醒完成之后呢,它会把自己加入到某一个P里面的携程等待队列里,或者是全局等待队列里。
    这里需要注意的一点:当一个G被中断的时候,它在寄存器里面的运行状态也会保存在这个携程对象里,当携程再次获得运行机会的时候,这些就会在重新写入寄存器继续运行

    使用携程代码案例

    func TestGroution(t *testing.T)  {
     for i :=0;i<10; i++ {
      go func(i int) {
         fmt.Println(i)
      }(i)
     }
     fmt.Println("test")
    }
    复制代码

    输出结果

    === RUN   TestGroution
     test
     2
     0
     1
     3
     7
     6
     8
     9
     --- PASS: TestGroution (0.00s)
     PASS
     5
    复制代码

    我们会发现输出的结果缺少参数,并且在程序运行结束后还在输出携程内的参数,这是因为如果方法执行的过快,就会导致在携程还未跑完的情况下关闭程序,导致结果输出达不到预期标准

    这时候就需要使用延迟函数来延迟一段时间等待携程全部运行完成后在关闭程序

    func TestGroution(t *testing.T)  {
     for i :=0;i<10; i++ {
      go func(i int) {
         fmt.Println(i)
      }(i)
     }
     fmt.Println("test")
     time.Sleep(time.Millisecond * 50)
    }
    复制代码

    输出结果

    === RUN   TestGroution
     test
     9
     5
     6
     7
     8
     1
     0
     2
     3
     4
     --- PASS: TestGroution (0.05s)
     PASS
    复制代码

2.共享内存并发机制

  • Lock共享锁

    代码案例, for自增数字,同时开启5000个携程来进行自增

      func TestCounter(t *testing.T)  {
      
       counter := 0
       
       for i := 0;i<5000 ;i++  {
          go func() {
                  counter++
          }()
      }
      time.Sleep(1 *time.Second)
      t.Logf("conuter = %d",counter)
     }
    复制代码

    输出结果

     === RUN   TestCounter
     share_mem_test.go:17: conuter = 4651
     --- PASS: TestCounter (1.00s)
    复制代码

    这时候我们可以看出,程序输出的结果与我们预期的结果不同。是因为,我们使用的counter在不同的携程里面去做这个自增,导致了这个并发的竞争条件,从传统的角度来说,他不是一个线程安全的程序,要保证他每个线程安全的话就需要对这个共享的内存这一块来进行一个锁的保护。

    给所有携程进行加锁操作

    func TestCounterThreadSafe(t *testing.T)  {
       var mut  sync.Mutex
       counter := 0
    
       for i := 0;i<5000 ;i++  {
          go func() {
             defer func() { // 使用锁的同时,我们通常会使用defer ,防止在我们程序运行时,出现异常没有及时将锁释放而导致程序整体被挂起
                mut.Unlock()
             }()
             mut.Lock()
             counter++
          }()
       }
       time.Sleep(1 *time.Second)
       t.Logf("conuter = %d",counter)
    }
    复制代码

    输出结果

    === RUN   TestCounterThreadSafe
     share_mem_test.go:36: conuter = 5000
    --- PASS: TestCounterThreadSafe (1.00s)
    复制代码

    使用lock锁保证线程安全的情况下,结果是符合我们预期的

  • WaitGroup

    image-20220228151852616.png

    在我们使用携程时,使用time.Sleep函数总是需要去等待携程运行完毕,这个时候可以使用WaitGroup函数来代替Sleep函数等待携程,而WaitGroup 作用就是同步各个线程的操作

    代码案例

    func TestCounterWaitGroup(t *testing.T)  {
       var mut  sync.Mutex
       var wg sync.WaitGroup
       counter := 0
    
       for i := 0;i<5000 ;i++  {
          wg.Add(1) // 新增一个我们需要等待的
          go func() {
             defer func() {
                mut.Unlock()
    
             }()
             mut.Lock()
             counter++
             // 完成之后,携程就会告诉我们有一个等待的时候已经完成了
             wg.Done()
          }()
       }
       wg.Wait()// 此方法为阻塞所有程序到这里,直到wg.Done 通知等待的任务已完成,才会走下一步处理
       //time.Sleep(1 *time.Second)
       t.Logf("conuter = %d",counter)
       }
    复制代码

    输出结果

     === RUN   TestCounterWaitGroup
     share_mem_test.go:42: conuter = 5000
    --- PASS: TestCounterWaitGroup (0.00s)
    PASS
    复制代码

    输出结果与预期完全符合,并且不需要sleep

3. CSP并发模型

  • CSP简单介绍

    CSP模型是上个世纪七十年代提出的,用于描述两个独立的并发实体通过共享的通讯 channel(管道)进行通信的并发模型。 CSP中channel是第一类对象,它不关注发送消息的实体,而关注与发送消息时使用的channel。

    Go 语言内 channel的基本机制

    image-20220228163545264.png

    首先我们看第一种交互方式: 通讯的两方必须在同时在channel上,才能够完成这次交互,如果任意一方不在的话,那么另一方就会阻塞在那里,直到另一方恢复后才能完成这次的交互

    第二种交互方式 :Buffered Channel 这种方式让发送者跟接受者有更松耦合的关系,在channel内我们给它设置一个容量,在容量未满的情况下发送消息的人可以一直往容量里面放,直到容量满了后,你需要等待接受消息的人来处理你发送的消息,直到把容量空出来可以放入你下一个消息时,它才会向下执行,否则就会一直阻塞在那里等待容量减少。对接受者也是一样的,容量内有消息就会一直拿,没有的话就会停在那里等待消息过来

  • 在Go里面实现CSP并发模型

    首先看一个同步阻塞的案例

     func service() string  {
         time.Sleep(time.Millisecond * 50)
         return "Done"
     }
    
     func otherTask()  {
         fmt.Println("working on something else")
         time.Sleep(time.Millisecond * 100)
         fmt.Println("Task is done.")
     }
    
     func TestService(t *testing.T)  {
         fmt.Println(service())
         otherTask()
     }
    复制代码
     Done
     working on something else
     Task is done.
     --- PASS: TestService (0.15s)
    复制代码

    输出顺序正常

    使用CSP方法来构造

        func service() string  {
            time.Sleep(time.Millisecond * 50)
            return "Done"
        }
    
        func otherTask()  {
            fmt.Println("working on something else")
            time.Sleep(time.Millisecond * 100)
            fmt.Println("Task is done.")
        }
    
    
        func AsyncService() chan string {
            retCh := make(chan string)  //构造一个channel
            // 第二步的时候,如果外面的调用程序需要我们返回的结果的话,他们就可以在channel上去等待
            // 返回channel的时候,我们还没去执行下面的程序,所以要等携程处理完将结果放进channel里
            go func() { //第一步 被调用的时候启动另外一条携程去执行,而不是阻塞当前程序的运行
                ret := service()
                fmt.Println("returned result.")
                retCh <- ret  // 运行完了之后把结果放进channel里
                fmt.Println("service exited.")
            }()
        // 由于我们暂时没用Buffered channel ,所以我们上面运行service()的携程都会被阻塞在消息传递的过程当中,直到这个消息传递完成,才会去向下完成
    
            return retCh // 返回channel
        }
    
        func TestAsynService(t *testing.T)  {
            retCh := AsyncService() // 返回的是一个channel
            otherTask() // 调另外一个task  这个时候 上面可以没有执行完成,因为他是在另一个携程里面去执行的
            fmt.Println(<-retCh) // 打印出channel传递的消息
        }
    复制代码
    === RUN   TestAsynService
    working on something else
    returned result.
    Task is done.
    Done
    service exited.
    --- PASS: TestAsynService (0.10s)
    复制代码

    我们可以看到 首先执行的是otherTask的第一个结果,然后在携程服务执行完成后,接着输出携程任务内的returned result. ,下一步又接着otherTask的输出,otherTask输出完成之后,最后输出了channel里面传递过来的结果,紧接着输出了携程内的 service exited. 。正常来说应该是输出完 returned result. 就应该输出 service exited. 的,出现这种情况的原因是我们使用的CSP是为上图第一种方式,所以在 retCh <- ret 传递消息这一步,channel内的消息未被接传递出去,程序阻塞到了这里,待我们走到最后一步拿channel内的消息后,才会释放阻塞,运行 retCh <- ret 下面的程序

  • 使用Buffered Channel 来实现松耦合低阻塞异步,解决上面的阻塞问题

    buffered channel 的使用方法就是在make channel时,第二个参数直接设置好channel的容量

    func AsyncService() chan string {
           retCh := make(chan string,1)  // 第二个参数,加入容量,因为我们的返回消息结果只有一个,所以给1就可以
    
    // 第二步的时候,如果外面的调用程序需要我们返回的结果的话,他们就可以在channel上去等待
           // 返回channel的时候,我们还没去执行下面的程序,所以要等携程处理完将结果放进channel里
           go func() { //第一步 被调用的时候启动另外一条携程去执行,而不是阻塞当前程序的运行
                   ret := service()
                   fmt.Println("returned result.")
                   retCh <- ret  // 运行完了之后把结果放进channel里
                   fmt.Println("service exited.")
           }()
           // 由于我们暂时没用Buffered channel ,所以我们上面运行service()的携程都会被阻塞在消息传递的过程当中,直到这个消息传递完成,才会去向下完成
    
           return retCh // 返回channel
    }
    复制代码
    === RUN   TestAsynService
    working on something else
    returned result.
    service exited.
    Task is done.
    Done
    复制代码

    这次的返回结果就可以看出无任何阻塞,在channel发送消息的时候并未阻塞,直接输出 service exited.退出,携程就完全释放了,这是一种更高效的方法

4. 多路选择和超时

  • select多路选择

    类似于switch的写法,检测接受channel内的值是否合规的一个校验写法
    image-20220301141107945.png

5. channel的关闭和广播

  • channel关闭的一些说明

    image-20220301150032536.png

    一个简单的channel发送接收代码例子

    
       func dataProducer(ch chan int,wg *sync.WaitGroup)  {
       go func() {
       for i := 0;i<10;i++  {
        ch <- i
       }
       wg.Done()
       }()
       }
       func dataService(ch chan int,wg  *sync.WaitGroup)  {
       go func() {
       for i :=0; i<10 ;i++  {
          data := <- ch
          fmt.Println(data)
       }
       wg.Done()
       }()
       }
    
       func TestChannelClose(t *testing.T)  {
       var wg sync.WaitGroup
       ch := make(chan int)
       wg.Add(1)
       dataProducer(ch,&wg)
       wg.Add(1)
       dataService(ch,&wg)
       wg.Wait()
    
       }
    复制代码
       0
       1
       2
       3
       4
       5
       6
       7
       8
       9
       --- PASS: TestChannelClose (0.00s)
    复制代码

    首先我们在第一个方法内,将channel塞进去十个数,然后到了第二个方法去读的时候,也直接按照10的循环来取,这时候就有个问题,如果我们不知道第一个方法在channel内塞了多少个数据的时候,那我们第二个方法要如何辨别?

    可以放置一个标识来通知到dataService 已经放置完成,但是这种值适合一个对一个dataService来实现,当多个的时候就会出现问题

    为了解决这种问题的话,在Go里面会有一个特殊的操作,叫做关闭channel,当我们数据发送完毕了之后我们可以把channel Close 掉

    func dataProducer(ch chan int,wg *sync.WaitGroup)  {
      go func() {
          for i := 1;i<11;i++  {
              ch <- i
          }
          close(ch)
          wg.Done()
          }()
      }
    
    func dataService(ch chan int,wg  *sync.WaitGroup)  {
    
      go func() {
         for  {
           if data,ok := <- ch; ok{
              fmt.Println(data)
          }else {
             break
           }
         }
         wg.Done()
         }()
      }
    
      func TestChannelClose(t *testing.T)  {
          var wg sync.WaitGroup
          ch := make(chan int)
          wg.Add(1)
          dataProducer(ch,&wg)
          wg.Add(1)
          dataService(ch,&wg)
          wg.Wait()
      }
    复制代码
    === RUN   TestChannelClose
     0
     1
     2
     3
     4
     5
     6
     7
     8
     9
     --- PASS: TestChannelClose (0.00s)
    复制代码

    结果也是被正常打印出来,使用通道关闭来通知dataService 程序已完成,可以退出,并且在这种模式下可以使用对多个Service来进行操作,就不会出现上面说的那种对多个就会出现携程不一致导致的问题

6. 任务的取消

上面所有课程集合起来的实战应用

     //任务是否已被取消
     //实现原理:
     //检查是否从 channel 收到一个消息,如果收到一个消息,我们就返回 true,代表任务已经被取消了
     //当没有收到消息,channel 会被阻塞,多路选择机制就会走到 default 分支上去。
     func isCanceled(cancelChan chan struct{}) bool {
     select {
     case <-cancelChan:
      return true
     default:
      return false
     }
     }

     //执行任务取消
     //因为 close() 是一个广播机制,所以所有的协程都会收到消息
     func execCancel(cancelChan chan struct{}) {
     // close(cancelChan)会使所有处于处于阻塞等待状态的消息接收者(<-cancelChan)收到消息
     close(cancelChan)
     }

     //利用 CSP, 多路选择机制和 channel 的关闭与广播实现任务取消功能
     func TestCancel(t *testing.T) {
     var wg sync.WaitGroup
     cancelChan := make(chan struct{}, 0)

     //启动 5 个协程
     for i := 0; i < 5; i++ {
      wg.Add(1)
      go func(i int, cancelChan chan struct{}, wg *sync.WaitGroup) {
         //做一个 while(true) 的循环,一直检查任务是否有被取消
         for {
            if isCanceled(cancelChan) {
               fmt.Println(i, "is Canceled")
               wg.Done()
               break;
            } else {
               //其它正常业务逻辑
               t.Log("正常业务")
               time.Sleep(time.Millisecond * 5)
            }
         }
      }(i, cancelChan, &wg)
     }
     //执行任务取消
     execCancel(cancelChan)
     wg.Wait()
     }
复制代码
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享