此篇介绍一下Go语言的并发编程的一些相关知识。
常用概念
函数 | 作用 |
---|---|
Goexit | 退出当前执行的 goroutine,但是 defer 函数还会继续调用 |
Gosched | 让出当前 goroutine 的执行权限,调度器安排其他等待的任务运行,并在下次某个时候从该位置恢复执行 |
NumCPU | 返回 CPU 核数量 |
NumGoroutine | 返回正在执行和排队的任务总数 |
GOMAXPROCS | 用来设置可以运行的 CPU 核数,GOMAXPROCS的最大值是256 |
runtime.GOMAXPROCS(0) | 核数的2倍 |
goroutine
- 在程序启动时,Go程序就会为main()函数创建一个默认的goroutine。在 Go 语言里只有一个主协程,其它都是它的子协程,子协程之间是平行关系。
- 主协程运行结束,其它协程就会立即消亡,不管它们是否已经开始运行。
- 在使用子协程时一定要注意保护好每个子协程,确保它们正常安全的运行。因为子协程的异常退出会将异常传播到主协程,直接会导致主协程也跟着挂掉,然后整个程序就崩溃了。
- 子协程崩溃时,主协程里剩余的语句没能运行就挂掉了,主协程在异常退出时会打印堆栈信息。从堆栈信息中可以了解到是哪行代码引发了程序崩溃。
- 为了保护子协程的安全,通常会在子协程的入口开头处增加 recover() 语句来恢复子协程内部发生的异常,阻断它传播到主协程导致程序崩溃。
go func() {
defer func() {
if err := recover(); err != nil {
fmt.Println("recover success.")
debug.PrintStack()
}
}()
// do something
}
- OS线程(操作系统线程)一般都有固定的栈内存(通常为2MB),一个goroutine的栈在其生命周期开始时只有很小的栈(典型情况下2KB),goroutine的栈不是固定的,可以按需增大和缩小,goroutine的栈大小限制可以达到1GB,虽然极少会用到这个大。
- 在Go语言中一次创建十万左右的goroutine也是可以的。Go语言单机管理百万级的协程也可以。
- 一个goroutine必定对应一个函数,可以创建多个goroutine去执行相同的函数。
- 一个进程内部可以运行多个线程,而每个线程又可以运行很多协程。线程要负责对协程进行调度,保证每个协程都有机会得到执行。当一个协程睡眠时,它要将线程的运行权让给其它的协程来运行,而不能持续霸占这个线程。同一个线程内部最多只会有一个协程正在运行。
- 线程的调度是由操作系统负责的,调度算法运行在内核态,而协程的调用是由 Go 语言的运行时负责的,调度算法运行在用户态。
- 协程可以简化为三个状态,运行态、就绪态和休眠态。同一个线程中最多只会存在一个处于运行态的协程,就绪态的协程是指那些具备了运行能力但是还没有得到运行机会的协程,它们随时会被调度到运行态,休眠态的协程还不具备运行能力,它们是在等待某些条件的发生,比如 IO 操作的完成、睡眠时间的结束等。
- 操作系统对线程的调度是抢占式的,也就是说单个线程的死循环不会影响其它线程的执行,每个线程的连续运行受到时间片的限制。
- Go 语言运行时对协程的调度并不是抢占式的。如果单个协程通过死循环霸占了线程的执行权,那这个线程就没有机会去运行其它协程了,你可以说这个线程假死了。不过一个进程内部往往有多个线程,假死了一个线程没事,全部假死了才会导致整个进程卡死。
- 每个线程都会包含多个就绪态的协程形成了一个就绪队列,如果这个线程因为某个别协程死循环导致假死,那这个队列上所有的就绪态协程是不是就没有机会得到运行了呢?Go 语言运行时调度器采用了 work-stealing 算法,当某个线程空闲时,也就是该线程上所有的协程都在休眠(或者一个协程都没有),它就会去其它线程的就绪队列上去偷一些协程来运行。也就是说这些线程会主动找活干,在正常情况下,运行时会尽量平均分配工作任务。
- 默认情况下,Go 运行时会将线程数会被设置为机器 CPU 逻辑核心数。同时它内置的 runtime 包提供了 GOMAXPROCS(n int) 函数允许用户动态调整线程数,注意这个函数名字是全大写,该函数会返回修改前的线程数,如果参数 n <=0 ,就不会产生修改效果,等价于读操作。
// 读取默认的 线程数
fmt.Println(runtime.GOMAXPROCS(0))
// 设置 线程数 为 10
runtime.GOMAXPROCS(10)
// 读取当前的 线程数
fmt.Println(runtime.GOMAXPROCS(0))
- 获取当前的 协程 数量可以使用 runtime 包提供的 NumGoroutine() 方法。
- goroutine和OS线程是多对多的关系,即m:n。
- 在 HTTP API 应用中,每一个 HTTP 请求,服务器都会单独开辟一个协程来处理。在这个请求处理过程中,要进行很多 IO 调用,比如访问数据库、访问缓存、调用外部系统等,协程会休眠,IO 处理完成后协程又会再次被调度运行。待请求的响应回复完毕后,链接断开,这个协程的寿命也就到此结束。
- 在消息推送系统中,客户端的链接寿命很长,大部分时间这个链接都是空闲状态,客户端会每隔几十秒周期性使用心跳来告知服务器你不要断开我。在服务器端,每一个来自客户端链接的维持都需要单独一个协程。因为消息推送系统维持的链接普遍很闲,单台服务器往往可以轻松撑起百万链接,这些维持链接的协程只有在推送消息或者心跳消息到来时才会变成就绪态被调度运行。
- 聊天系统也是长链接系统,它内部来往的消息要比消息推送系统频繁很多,限于 CPU 和 网卡的压力,它能撑住的连接数要比推送系统少很多。不过原理是类似的,都是一个链接由一个协程长期维持,连接断开协程也就消亡。
channel
- Go语言提供了channel在多个goroutine间进行通信。channel是可以让一个goroutine发送特定值到另一个goroutine的通信机制。
- goroutine和channel是 Go 语言秉承的 CSP(Communicating Sequential Process)并发模式的重要实现基础。
- 单纯地将函数并发执行是没有意义的。函数与函数间需要交换数据才能体现并发执行函数的意义。
- 虽然可以使用共享内存进行数据交换,但是共享内存在不同的goroutine中容易发生竞态问题。为了保证数据交换的正确性,必须使用互斥量对内存进行加锁,这种做法势必造成性能问题。
- channel是引用类型,channel类型的空值是nil。
- 声明的channel在使用make函数进行初始化之后才能使用。
- channel有发送(send)、接收(receive)和关闭(close)三种操作。
- 只有在通知接收方goroutine所有的数据都发送完毕的时候才需要关闭通道。
- channel是可以被垃圾回收机制回收的,它和关闭文件不一样,在结束操作之后关闭文件是必须要做的,但关闭channel不是必须的。
- 对一个关闭的通道再发送值就会导致panic。试图关闭一个已经关闭的通道也会导致宕机,就像关闭一个空通道一样。对一个关闭的通道进行接收会一直获取值直到通道为空。对一个关闭的并且没有值的通道执行接收操作会得到对应类型的零值。
- 关闭通道还可以作为一个广播机制。
无缓冲通道
- 无缓冲的通道属于一种阻塞的通道。
- 使用无缓冲通道进行通信将导致发送和接收的goroutine同步化。因此,无缓冲通道也被称为同步通道。
有缓冲的通道
- 只要通道的容量大于零,那么该通道就是有缓冲的通道,通道的容量表示通道中能存放元素的数量。
- 可以使用内置的len函数获取通道内元素的数量,使用cap函数获取通道的容量,虽然很少会这么做。
优雅的从通道循环取值
- 当通过通道发送有限的数据时,我们可以通过close函数关闭通道来告知从该通道接收值的goroutine停止等待。当通道被关闭时,往该通道发送值会引发panic,从该通道里接收的值一直都是类型零值。
- 有2种方式在接收值的时候判断通道是否被关闭:ok语法以及for range方式。最通常使用的是for range的方式。
单向通道
- chan<- int 是“只写通道”,只可以往里写入数据的通道。
- <-chan int 是“只读通道”,只可以从中读取数据的通道。
- 事实上 channel 只读或只写都没有意义,所谓的单向 channel 其实只是声明时用,用于在函数内防止滥用。
- 在函数传参及任何赋值操作中将双向通道转换为单向通道是可以的,但反过来是不可以的。
goroutine池(worker pool)
- 在实际工作中通常会使用可以指定启动的goroutine数量的——worker pool模式,来控制goroutine的数量,防止goroutine泄漏和暴涨。
package main
import (
"fmt"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Printf("worker: %d start job: %d\n", id, j)
time.Sleep(time.Second)
fmt.Printf("worker: %d end job: %d\n", id, j)
results <- j * 2
}
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
//开启3个goroutine
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
// 5个任务
for j := 1; j <= 5; j++ {
jobs <- j
}
close(jobs)
// 输出结果
for a := 1; a <= 5; a++ {
<-results
}
}
输出:
worker: 3 start job: 1
worker: 1 start job: 2
worker: 2 start job: 3
worker: 3 end job: 1
worker: 3 start job: 4
worker: 1 end job: 2
worker: 2 end job: 3
worker: 2 start job: 5
worker: 2 end job: 5
worker: 3 end job: 4
select多路复用
- select可处理一个或多个channel的发送/接收操作。
- 如果多个case同时满足,select会随机选择一个。
- 对于没有case的select{}会一直等待,可用于阻塞main函数。
并发安全和锁
- 有时候在Go代码中可能会存在多个goroutine同时操作一个资源(临界区),这种情况就会导致竞态问题(数据竞态)。
互斥锁(sync.Mutex)
- Go语言中使用sync包的Mutex类型来实现互斥锁。
- 使用互斥锁能够保证同一时间有且只有一个goroutine进入临界区,其他的goroutine则在等待锁。
- 当互斥锁释放后,等待的goroutine才可以获取锁进入临界区,多个goroutine同时等待一个锁时,唤醒的策略是随机的。
读写锁(sync.RWMutex)
- 互斥锁是完全互斥的,但是有很多实际的场景下是读多写少的,当我们并发的去读取一个资源不涉及资源修改的时候是没有必要加锁的,这种场景下使用读写锁是更好的一种选择。
- 读写锁在Go语言中使用sync包中的RWMutex类型。
- 读写锁分为两种:读锁和写锁。当一个goroutine获取读锁之后,其他的goroutine如果是获取读锁会继续获得锁,如果是获取写锁就会等待;当一个goroutine获取写锁之后,其他的goroutine无论是获取读锁还是写锁都会等待。
- 读写锁非常适合读多写少的场景,如果读和写的操作差别不大,读写锁的优势就发挥不出来。
- Lock写锁定,Unlock写解锁,RLock读锁定,RUnlock读解锁。
sync.WaitGroup
- 在代码中生硬的使用time.Sleep肯定是不合适的,Go语言中可以使用sync.WaitGroup来实现并发任务的同步。
- sync.WaitGroup内部维护着一个计数器,计数器的值可以增加和减少。通过调用Wait()来等待并发任务执行完,当计数器值为0时,表示所有并发任务已经完成。
- 需要注意sync.WaitGroup是一个结构体,传递的时候要传递指针。
sync.Once
- 在编程的很多场景下需要确保某些操作在高并发的场景下只执行一次,例如只加载一次配置文件、只关闭一次通道等。
- Go语言中的sync包中提供了一个针对只执行一次场景的解决方案:sync.Once。
- sync.Once只有一个Do方法。
var loadConfig sync.Once
func loadConfigFile() {
// do load config file
}
loadConfig.Do(loadConfigFile)
- sync.Once其实内部包含一个互斥锁和一个布尔值,互斥锁保证布尔值和数据的安全,而布尔值用来记录初始化是否完成。这样设计就能保证初始化操作的时候是并发安全的并且初始化操作也不会被执行多次。
sync.Map
- Go语言中内置的map不是并发安全的。
- Go语言的sync包中提供了一个开箱即用的并发安全版map——sync.Map。开箱即用表示不用像内置的map一样使用make函数初始化就能直接使用,同时sync.Map内置了诸如Store、Load、LoadOrStore、Delete、Range等操作方法。
原子操作
- 代码中的加锁操作因为涉及内核态的上下文切换会比较耗时、代价比较高。针对基本数据类型其实可以使用原子操作来保证并发安全,因为原子操作是Go语言提供的方法,它在用户态就可以完成,因此性能比加锁操作更好。
- Go语言中原子操作由内置的标准库sync/atomic提供。
atomic包
- atomic包提供了底层的原子级内存操作,对于同步算法的实现很有用。这些函数必须谨慎地保证正确使用。除了某些特殊的底层应用,使用通道或者sync包的函数/类型实现同步更好。
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
type Counter interface {
Inc()
Load() int64
}
// 普通版
type CommonCounter struct {
counter int64
}
func (c CommonCounter) Inc() {
c.counter++
}
func (c CommonCounter) Load() int64 {
return c.counter
}
// 互斥锁版
type MutexCounter struct {
counter int64
lock sync.Mutex
}
func (m *MutexCounter) Inc() {
m.lock.Lock()
defer m.lock.Unlock()
m.counter++
}
func (m *MutexCounter) Load() int64 {
m.lock.Lock()
defer m.lock.Unlock()
return m.counter
}
// 原子操作版
type AtomicCounter struct {
counter int64
}
func (a *AtomicCounter) Inc() {
atomic.AddInt64(&a.counter, 1)
}
func (a *AtomicCounter) Load() int64 {
return atomic.LoadInt64(&a.counter)
}
func test(c Counter) {
var wg sync.WaitGroup
start := time.Now()
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
c.Inc()
wg.Done()
}()
}
wg.Wait()
end := time.Now()
fmt.Println(c.Load(), end.Sub(start))
}
func main() {
c1 := CommonCounter{} // 非并发安全
test(c1)
c2 := MutexCounter{} // 使用互斥锁实现并发安全
test(&c2)
c3 := AtomicCounter{} // 并发安全且比互斥锁效率更高
test(&c3)
}
调度器运行时机
- 调度器会在GC、"go"声明、阻塞channel操作、阻塞系统调用和lock操作后运行。它也会在非内联函数调用后执行。
- 要想知道在for循环中调用的函数是否是内联的,可以在"go build"或"go run"时传入"-m" gc标识(如go build -gcflags -m)。
- 可以显式的唤起调度器,比如使用"runtime"包中的Gosched()函数。