并发 -《Go In Action》-Ch6

并发 -《Go In Action》-Ch6

每个goroutine是一个独立的工作单元,这个单元会被调度到可用的逻辑处理器上执行。Go运行时通过调度器管理goroutine,为其分配执行时间。
调度器在操作系统之上,将操作系统的线程和语言运行时的逻辑处理器绑定,并在逻辑处理器上运行goroutine。
Go语言通过在goroutine之间传递数据来通信,而不是对数据加锁来实现同步访问。

6.1 并行和并发

Go调度器如何管理goroutine

并发是让不同的代码片段同时在不同的物理处理器上执行,并发是指同时管理很多事情。
每当创建一个goroutine并准备运行,goroutine被分配到调度器的全局队列中,调度器会给goroutine分配一个逻辑处理器,将goroutine放到逻辑处理器对应的本地队列中。

并发和并行的区别

6.2 goroutine

下面这个程序展示了逻辑处理器是如何调度goroutine的,runtime.GOMAXPROCS(1)只允许程序使用一个逻辑处理器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package main

import (
"fmt"
"runtime"
"sync"
)

// wg is used to wait for the program to finish.
var wg sync.WaitGroup

// main is the entry point for all Go programs.
func main() {
// Allocate 1 logical processors for the scheduler to use.
runtime.GOMAXPROCS(1)

// Add a count of two, one for each goroutine.
wg.Add(2)

// Create two goroutines.
fmt.Println("Create Goroutines")
go printPrime("A")
go printPrime("B")

// Wait for the goroutines to finish.
fmt.Println("Waiting To Finish")
wg.Wait()

fmt.Println("Terminating Program")
}

// printPrime displays prime numbers for the first 5000 numbers.
func printPrime(prefix string) {
// Schedule the call to Done to tell main we are done.
defer wg.Done()

next:
for outer := 2; outer < 5000; outer++ {
for inner := 2; inner < outer; inner++ {
if outer%inner == 0 {
continue next
}
}
fmt.Printf("%s:%d\n", prefix, outer)
}
fmt.Println("Completed", prefix)
}

// output
// Create Goroutines
// Waiting To Finish
// B:2
// B:3
// ...
// B:4583
// B:4591
// A:3 ** 切换 goroutine
// A:5
// ...
// A:4561
// A:4567
// B:4603 ** 切换 goroutine
// B:4621
// ...
// Completed B
// A:4457 ** 切换 goroutine
// A:4463
// ...
// A:4993
// A:4999
// Completed A
// Terminating Program

可以看到goroutine A和B是交替运行的,因为只有一个逻辑处理器。调度过程可以用下图表示:

逻辑处理器调度goroutine

6.3 竞争状态

多个goroutine同时对一个共享资源进行读和写,容易进入相互竞争的状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package main

import (
"fmt"
"runtime"
"sync"
)

var (
// counter is a variable incremented by all goroutines.
counter int

// wg is used to wait for the program to finish.
wg sync.WaitGroup
)

// main is the entry point for all Go programs.
func main() {
// Add a count of two, one for each goroutine.
wg.Add(2)

// Create two goroutines.
go incCounter(1)
go incCounter(2)

// Wait for the goroutines to finish.
wg.Wait()
fmt.Println("Final Counter:", counter)
}

// incCounter increments the package level counter variable.
func incCounter(id int) {
// Schedule the call to Done to tell main we are done.
defer wg.Done()

for count := 0; count < 2; count++ {
// Capture the value of Counter.
value := counter

// Yield the thread and be placed back in queue.
runtime.Gosched()

// Increment our local value of Counter.
value++

// Store the value back into Counter.
counter = value
}
}

最后counter的值有可能是2,可以用下面这个图描述下过程

竞争状态下程序行为的图像表达

可以用go build -race检测代码里的竞争状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
go build -race // 用竞争检测器标志来编译程序
./example // 运行程序
==================
WARNING: DATA RACE
Write by goroutine 5:
main.incCounter()
/example/main.go:49 +0x96
Previous read by goroutine 6:
main.incCounter()
/example/main.go:40 +0x66
Goroutine 5 (running) created at:
main.main()
/example/main.go:25 +0x5c
Goroutine 6 (running) created at:
main.main()
/example/main.go:26 +0x73
==================
Final Counter: 2
Found 1 data race(s)

6.4 锁住共享资源

可以使用原子函数和互斥锁解决共享资源的问题

6.4.1 原子函数

原子函数能够以很底层的加锁机制来同步访问整形变量和指针,atomic包提供了一些原子操作,如AddInt64,这个函数会同步整型类型的的加法
LoadInt64和StoreInt64,这两个函数提供了一种安全的读写一个整型值的方式。

1
2
3
4
var count int64
atomic.AddInt64(&counter, 1)
atomic.LoadInt64(&cunter)
atomic.StoreInt64(&count, 1)

6.4.2 互斥锁

互斥锁用于在代码上创建一个临界区,保证同一时间只有一个goroutine 可以执行这个临界区代码

1
2
3
4
mutex sync.Mutex
mutex.Lock()
...
mutex.Unlock()

6.5 通道

可以使用make来创建通道

1
2
3
4
5
6
7
8
9
// 无缓冲的整型通道
unbuffered := make(chan int)
// 有缓冲的字符串通道
buffered := make(chan string, 10)

// 向通道发送值
buffered <- "Gopher"
// 从通道里接收值
value := <-buffered

无缓冲的通道(unbuffered channel)是指在接收前没有能力保存任何值的通道。这种类型的通
道要求发送goroutine 和接收goroutine 同时准备好,才能完成发送和接收操作。如果两个goroutine
没有同时准备好,通道会导致先执行发送或接收操作的goroutine 阻塞等待。这种对通道进行发送
和接收的交互行为本身就是同步的。其中任意一个操作都无法离开另一个操作单独存在。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// This sample program demonstrates how to use an unbuffered
// channel to simulate a relay race between four goroutines.
package main

import (
"fmt"
"sync"
"time"
)

// wg is used to wait for the program to finish.
var wg sync.WaitGroup

// main is the entry point for all Go programs.
func main() {
// Create an unbuffered channel.
baton := make(chan int)

// Add a count of one for the last runner.
wg.Add(1)

// First runner to his mark.
go Runner(baton)

// Start the race.
baton <- 1

// Wait for the race to finish.
wg.Wait()
}

// Runner simulates a person running in the relay race.
func Runner(baton chan int) {
var newRunner int

// Wait to receive the baton.
runner := <-baton

// Start running around the track.
fmt.Printf("Runner %d Running With Baton\n", runner)

// New runner to the line.
if runner != 4 {
newRunner = runner + 1
fmt.Printf("Runner %d To The Line\n", newRunner)
go Runner(baton)
}

// Running around the track.
time.Sleep(100 * time.Millisecond)

// Is the race over.
if runner == 4 {
fmt.Printf("Runner %d Finished, Race Over\n", runner)
wg.Done()
return
}

// Exchange the baton for the next runner.
fmt.Printf("Runner %d Exchange With Runner %d\n",
runner,
newRunner)

baton <- newRunner
}

有缓冲的通道(buffered channel)是一种在被接收前能存储一个或者多个值的通道。这种类
型的通道并不强制要求goroutine 之间必须同时完成发送和接收。通道会阻塞发送和接收动作的
条件也会不同。只有在通道中没有要接收的值时,接收动作才会阻塞。只有在通道没有可用缓冲
区容纳被发送的值时,发送动作才会阻塞。这导致有缓冲的通道和无缓冲的通道之间的一个很大
的不同:无缓冲的通道保证进行发送和接收的goroutine 会在同一时间进行数据交换;有缓冲的
通道没有这种保证。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
// This sample program demonstrates how to use a buffered
// channel to work on multiple tasks with a predefined number
// of goroutines.
package main

import (
"fmt"
"math/rand"
"sync"
"time"
)

const (
numberGoroutines = 4 // Number of goroutines to use.
taskLoad = 10 // Amount of work to process.
)

// wg is used to wait for the program to finish.
var wg sync.WaitGroup

// init is called to initialize the package by the
// Go runtime prior to any other code being executed.
func init() {
// Seed the random number generator.
rand.Seed(time.Now().Unix())
}

// main is the entry point for all Go programs.
func main() {
// Create a buffered channel to manage the task load.
tasks := make(chan string, taskLoad)

// Launch goroutines to handle the work.
wg.Add(numberGoroutines)
for gr := 1; gr <= numberGoroutines; gr++ {
go worker(tasks, gr)
}

// Add a bunch of work to get done.
for post := 1; post <= taskLoad; post++ {
tasks <- fmt.Sprintf("Task : %d", post)
}

// Close the channel so the goroutines will quit
// when all the work is done.
close(tasks)

// Wait for all the work to get done.
wg.Wait()
}

// worker is launched as a goroutine to process work from
// the buffered channel.
func worker(tasks chan string, worker int) {
// Report that we just returned.
defer wg.Done()

for {
// Wait for work to be assigned.
task, ok := <-tasks
if !ok {
// This means the channel is empty and closed.
fmt.Printf("Worker: %d : Shutting Down\n", worker)
return
}

// Display we are starting the work.
fmt.Printf("Worker: %d : Started %s\n", worker, task)

// Randomly wait to simulate work time.
sleep := rand.Int63n(100)
time.Sleep(time.Duration(sleep) * time.Millisecond)

// Display we finished the work.
fmt.Printf("Worker: %d : Completed %s\n", worker, task)
}
}

上面代码需要注意的是close(tasks),关闭通道后,goroutine依旧可以从通道接收数据,但是不能再向通道里发送数据。

6.6 小结

并发是指goroutine运行的时候是相互独立的
使用关键字go创建goroutine来运行函数
goroutine在逻辑处理器上执行,逻辑处理器具有独立的系统线程和运行队列
竞争状态是指两个或者多个goroutine试图访问同一个资源
原子函数和互斥锁提供了一种防止出现竞争状态的办法
通道提供了一种在两个goroutine之间共享数据的简单方法
无缓冲的通道保证同时交换数据,而有缓冲的通道不做这种保证

# Golang

Comments

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×