并发和并行
- A. 多线程程序在一个核的cpu上运行,就是并发。
- B. 多线程程序在多个核的cpu上运行,就是并行。
并发:本质还是串行
并行:任务分布在不同CPU上,同一时间点同时执行
协程和线程
- 协程:独立的栈空间,共享堆空间,调度由用户自己控制,本质上有点类似于用户级线程,这些用户级线程的调度也是自己实现的。
- 线程:一个线程上可以跑多个协程,协程是轻量级的线程。
协程与线程区别 - 协程跟线程是有区别的,线程由CPU调度是抢占式的
- 协程由用户态调度是协作式的,一个协程让出CPU后,才执行下一个协程
调度器GMP模型
- G:goroutine(协程)
- M:thread(内核线程,不是用户态线程)
- P:processer(调度器)
- G(协程),通常在代码里用 go 关键字执行一个方法,那么就等于起了一个G。
- M(内核线程),操作系统内核其实看不见G和P,只知道自己在执行一个线程。
- G和P都是在用户层上的实现。
- 并发量小的时候还好,当并发量大了,这把大锁,就成为了性能瓶颈。
GMP模型
gouroutine
- Go语言中的goroutine的概念类似于线程,但 goroutine是由Go的运行时(runtime)调度和管理的。
- Go程序会智能地将 goroutine 中的任务合理地分配给每个CPU。
- Go语言之所以被称为现代化的编程语言,就是因为它在语言层面已经内置了调度和上下文切换的机制。
- 在Go语言编程中你不需要去自己写进程、线程、协程,你的技能包里只有一个技能–goroutine
- 当你需要让某个任务并发执行的时候,你只需要把这个任务包装成一个函数
- 开启一个goroutine去执行这个函数就可以了,就是这么简单粗暴。
协程基本使用
启动一个协程
- 主线程中每个100毫秒打印一次,总共打印2次
- 另外开启一个协程,打印10次
- 情况一:打印是交替,证明是并行的
- 情况二:开启的协程打印两次,就退出了(因为主线程退出了)
func test() {
for i := 0; i < 10; i++ {
fmt.Println("test() 你好golang")
time.Sleep(time.Millisecond * 100)
}
}
func main() {
go test() //表示开启一个协程
for i := 0; i < 2; i++ {
fmt.Println("main() 你好golang")
time.Sleep(time.Millisecond * 100)
}
}
WaitGroup等待协程执行完毕
- 主线程退出后所有的协程无论有没有执行完毕都会退出
- 所以我们在主进程中可以通过WaitGroup等待协程执行完毕
- sync.WaitGroup内部维护着一个计数器,计数器的值可以增加和减少。
- 例如当我们启动了N 个并发任务时,就将计数器值增加N。
- 每个任务完成时通过调用Done()方法将计数器减1。
- 通过调用Wait()来等待并发任务执行完,当计数器值为0时,表示所有并发任务已经完成。
var wg sync.WaitGroup // 第一步:定义一个计数器
func test1() {
for i := 0; i < 10; i++ {
fmt.Println("test1() 你好golang-", i)
time.Sleep(time.Millisecond * 100)
}
wg.Done() //协程计数器-1 // 第三步:协程执行完毕,计数器-1
}
func test2() {
for i := 0; i < 2; i++ {
fmt.Println("test2() 你好golang-", i)
time.Sleep(time.Millisecond * 100)
}
wg.Done() //协程计数器-1
}
func main() {
wg.Add(1) //协程计数器+1 第二步:开启一个协程计数器+1
go test1() //表示开启一个协程
wg.Add(1) //协程计数器+1
go test2() //表示开启一个协程
wg.Wait() //等待协程执行完毕... 第四步:计数器为0时推出
fmt.Println("主线程退出...")
}
/*
test2() 你好golang- 0
test1() 你好golang- 0
.....
test1() 你好golang- 8
test1() 你好golang- 9
主线程退出...
*/
开启多个协程
- 在 Go 语言中实现并发就是这样简单,我们还可以启动多个 goroutine。
- 这里使用了 sync.WaitGroup 来实现等待 goroutine 执行完毕
- 多次执行上面的代码,会发现每次打印的数字的顺序都不一致。
- 这是因为 10 个 goroutine是并发执行的,而 goroutine 的调度是随机的。
var wg sync.WaitGroup
func hello(i int) {
defer wg.Done() // goroutine结束就登记-1
fmt.Println("Hello Goroutine!", i)
}
func main() {
for i := 0; i < 10; i++ {
wg.Add(1) // 启动一个goroutine就登记+1
go hello(i)
}
wg.Wait() // 等待所有登记的goroutine都结束
}
多协程统计素数
- 需求:要统计1-120000的数字中那些是素数?goroutine for循环实现
- 1 协程 统计 1-30000
- 2 协程 统计 30001-60000
- 3 协程 统计 60001-90000
- 4 协程 统计 90001-120000
- start:(n-1)30000+1 end:n30000
var wg sync.WaitGroup
func test(n int) {
for num := (n-1)*30000 + 1; num < n*30000; num++ {
if num > 1 {
var flag = true
for i := 2; i < num; i++ {
if num%i == 0 {
flag = false
break
}
}
if flag {
fmt.Println(num, "是素数")
}
}
}
wg.Done()
}
func main() {
start := time.Now().Unix()
for i := 1; i <= 4; i++ {
wg.Add(1)
go test(i)
}
wg.Wait()
fmt.Println("执行完毕")
end := time.Now().Unix()
fmt.Println(end - start) //1毫秒
}
Channel 管道
- 共享内存交互数据弊端
- 单纯地将函数并发执行是没有意义的。函数与函数间需要交换数据才能体现并发执行函数的意义。
- 虽然可以使用共享内存进行数据交换,但是共享内存在不同的goroutine中容易发生竞态问题。
- 为了保证数据交换的正确性,必须使用互斥量对内存进行加锁,这种做法势必造成性能问题。
- channel好处
- Go 语言中的通道(channel)是一种特殊的类型。
- 通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。
- 每一个通道都是一个具体类型的导管,也就是声明channel的时候需要为其指定元素类型。
- 如果说goroutine是Go程序并发的执行体,channel就是它们之间的连接。
- channel是可以让一个goroutine发送特定值到另一个goroutine的通信机制。
channel 是一种类型,一种引用类型
声明管道类型的格式如下:
var 变量 chan 元素类型
var ch1 chan int // 声明一个传递整型的管道
var ch2 chan bool // 声明一个传递布尔型的管道
var ch3 chan []int // 声明一个传递 int 切片的管道
创建channel
- 声明的管道后需要使用 make 函数初始化之后才能使用。
- 创建 channel 的格式如下:make(chan 元素类型, 容量)
// 创建一个能存储 10 个 int 类型数据的管道
ch1 := make(chan int, 10)
// 创建一个能存储 4 个 bool 类型数据的管道
ch2 := make(chan bool, 4)
// 创建一个能存储 3 个[]int 切片类型数据的管道
ch3 := make(chan []int, 3)
channel操作
- 管道有发送(send)、接收(receive)和关闭(close)三种操作。
- 发送和接收都使用<-符号。
- 现在我们先使用以下语句定义一个管道:
ch := make(chan int, 3)
发送(将数据放在管道内)
将一个值发送到管道中。
ch <- 10 // 把 10 发送到 ch 中
接收(从管道内取值)
从一个管道中接收值。
x := <- ch // 从 ch 中接收值并赋值给变量 x
<-ch // 从 ch 中接收值,忽略结果
我们通过调用内置的 close 函数来关闭管道: close(ch)
无缓冲的管道
- 如果创建管道的时候没有指定容量,那么我们可以叫这个管道为无缓冲的管道
- 无缓冲的管道又称为阻塞的管道。
func main() {
ch := make(chan int)
ch <- 10
fmt.Println("发送成功")
}
-- 面这段代码能够通过编译,但是执行的时候会出现以下错误
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.main()
有缓冲的管道
- 解决上面问题的方法还有一种就是使用有缓冲区的管道。
- 我们可以在使用 make 函数初始化管道的时候为其指定管道的容量
- 只要管道的容量大于零,那么该管道就是有缓冲的管道,管道的容量表示管道中能存放元素的数量。
- 就像你小区的快递柜只有那么个多格子,格子满了就装不下了,就阻塞了,等到别人取走一个快递员就能往里面放一个。
func main() {
ch := make(chan int, 5)
ch <- 10
ch <- 12
fmt.Println("发送成功")
}
优雅的从channel取值
- 当通过通道发送有限的数据时,我们可以通过close函数关闭通道来告知从该通道接收值的goroutine停止等待。
- 当通道被关闭时,往该通道发送值会引发panic,从该通道里接收的值一直都是类型零值。
- 那如何判断一个通道是否被关闭了呢?
- for range的方式判断通道关闭
func f1(ch1 chan int) {
for i := 0; i < 100; i++ {
ch1 <- i
}
close(ch1)
}
func f2(ch1 chan int, ch2 chan int) {
for {
i, ok := <-ch1 // 通道关闭后再取值ok=false
if !ok {
break
}
ch2 <- i * i
}
close(ch2)
}
// channel 练习
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
// 开启goroutine将0~100的数发送到ch1中
go f1(ch1)
// 开启goroutine从ch1中接收值,并将该值的平方发送到ch2中
go f2(ch1, ch2)
// 在主goroutine中从ch2中接收值打印
for i := range ch2 { // 通道关闭后会退出for range循环
fmt.Println(i)
}
}
Goroutine结合Channel管道
- 需求 1:定义两个方法,一个方法给管道里面写数据,一个给管道里面读取数据,要求同步进行。
- 1、开启一个 fn1 的的协程给向管道 inChan 中写入 100 条数据
- 2、开启一个 fn2 的协程读取 inChan 中写入的数据
- 3、注意:fn1 和 fn2 同时操作一个管道
- 4、主线程必须等待操作完成后才可以退出
- 注:for range的方式判断通道关闭,推出程序
var wg sync.WaitGroup
func main() {
intChan := make(chan int,10)
wg.Add(2)
go write(intChan)
go read(intChan)
wg.Wait()
fmt.Println("读取完毕...")
}
func write(intChan chan int) {
defer wg.Done()
for i:=0;i<10;i++{
intChan <- i
}
close(intChan)
}
func read(intChan chan int) {
defer wg.Done()
for v := range intChan {
fmt.Println(v)
time.Sleep(time.Second)
}
}
单向管道
- 有的时候我们会将管道作为参数在多个任务函数间传递
- 很多时候我们在不同的任务函数中使用管道都会对其进行限制
- 比如限制管道在函数中只能发送或只能接收
func main() {
//1. 在默认情况下下,管道是双向
//var chan1 chan int //可读可写
//2 声明为只写
var chan2 chan<- int
chan2 = make(chan int, 3)
chan2<- 20
//num := <-chan2 //error
fmt.Println("chan2=", chan2)
//3. 声明为只读
var chan3 <-chan int
num2 := <-chan3
//chan3<- 30 //err
fmt.Println("num2", num2)
}
Goroutine池
- 本质上是生产者消费者模型
- 在工作中我们通常会使用可以指定启动的goroutine数量–worker pool模式,控制goroutine的数量,防止goroutine泄漏和暴涨。
一个简易的work pool示例代码如下:
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Printf("worker:%d start job:%d\n", id, j)
time.Sleep(time.Second)
fmt.Printf("worker:%d end job:%d\n", id, j)
results <- j * 2
}
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
// 开启3个goroutine
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
// 5个任务
for j := 1; j <= 5; j++ {
jobs <- j
}
close(jobs)
// 输出结果
for a := 1; a <= 5; a++ {
<-results
}
}
select
- 传统的方法在遍历管道时,如果不关闭会阻塞而导致 deadlock,在实际开发中,可能我们不好确定什么关闭该管道。
- 这种方式虽然可以实现从多个管道接收值的需求,但是运行性能会差很多。
- 为了应对这种场景,Go 内置了 select 关键字,可以同时响应多个管道的操作。
- select 的使用类似于 switch 语句,它有一系列 case 分支和一个默认的分支。
- 每个 case 会对应一个管道的通信(接收或发送)过程。
- select 会一直等待,直到某个 case 的通信操作完成时,就会执行 case 分支对应的语句。
具体格式如下:
select {
case <-chan1:
// 如果chan1成功读到数据,则进行该case处理语句
case chan2 <- 1:
// 如果成功向chan2写入数据,则进行该case处理语句
default:
// 如果上面都没有成功,则进入default处理流程
}
- 使用 select 语句能提高代码的可读性。
- 可处理一个或多个 channel 的发送/接收操作。
- 如果多个 case 同时满足,select 会随机选择一个。
- 对于没有 case 的 select{}会一直等待,可用于阻塞 main 函数。
func main() {
// 在某些场景下我们需要同时从多个通道接收数据,这个时候就可以用到golang中给我们提供的select多路复用
//1.定义一个管道 10个数据int
intChan := make(chan int, 10)
for i := 0; i < 10; i++ {
intChan <- i
}
//2.定义一个管道 5个数据string
stringChan := make(chan string, 5)
for i := 0; i < 5; i++ {
stringChan <- "hello" + fmt.Sprintf("%d", i)
}
//使用select来获取channel里面的数据的时候不需要关闭channel
for {
select {
case v := <-intChan:
fmt.Printf("从 intChan 读取的数据%d\n", v)
time.Sleep(time.Millisecond * 50)
case v := <-stringChan:
fmt.Printf("从 stringChan 读取的数据%v\n", v)
time.Sleep(time.Millisecond * 50)
default:
fmt.Printf("数据获取完毕")
return //注意退出...
}
}
}
并发安全
有时候在Go代码中可能会存在多个goroutine同时操作一个资源(临界区),这种情况会发生竞态问题(数据竞态)。
类比现实生活中的例子有十字路口被各个方向的的汽车竞争;还有火车上的卫生间被车厢里的人竞争。
下面开启两个协程,对变量x加一操作,分别加5000次,理想结果是10000,实际三次结果都不相同
var x int64
var wg sync.WaitGroup
func add() {
for i := 0; i < 5000; i++ {
x = x + 1
}
wg.Done()
}
func main() {
wg.Add(2)
go add()
go add()
wg.Wait()
fmt.Println(x)
}
互斥锁
互斥锁是一种常用的控制共享资源访问的方法,它能够保证同时只有一个goroutine可以访问共享资源。
Go语言中使用sync包的Mutex类型来实现互斥锁。
使用互斥锁来修复上面代码的问题:
var x int64
var wg sync.WaitGroup
var lock sync.Mutex
func add() {
for i := 0; i < 5000; i++ {
lock.Lock() // 加锁
x = x + 1
lock.Unlock() // 解锁
}
wg.Done()
}
func main() {
wg.Add(2)
go add()
go add()
wg.Wait()
fmt.Println(x) // 10000
}
读写互斥锁
- 互斥锁是完全互斥的,但是有很多实际的场景下是读多写少的
- 当我们并发的去读取一个资源不涉及资源修改的时候是没有必要加锁的,这种场景下使用读写锁是更好的一种选择。
- 读写锁在Go语言中使用sync包中的RWMutex类型。
- 读写锁分为两种:读锁和写锁
- 当一个goroutine获取读锁之后,其他的goroutine如果是获取读锁会继续获得锁,如果是获取写锁就会等待;
- 当一个goroutine获取写锁之后,其他的goroutine无论是获取读锁还是写锁都会等待。
注意:是读写锁非常适合读多写少的场景,如果读和写的操作差别不大,读写锁的优势就发挥不出来。
var wg sync.WaitGroup
var mutex sync.RWMutex
//写的方法
func write() {
mutex.Lock()
fmt.Println("执行写操作")
time.Sleep(time.Second * 2)
mutex.Unlock()
wg.Done()
}
//读的方法
func read() {
mutex.RLock()
fmt.Println("---执行读操作")
time.Sleep(time.Second * 2)
mutex.RUnlock()
wg.Done()
}
func main() {
for i := 0; i < 10; i++ { //开启10个协程执行读操作
wg.Add(1)
go write()
}
for i := 0; i < 10; i++ { // 开启10个协程执行写操作
wg.Add(1)
go read()
}
wg.Wait()
}