- Q1:用什么手段可以对 goroutine 的启用数量加以限制 ?
- Q2:怎样才能让主 goroutine 等待其他 goroutine ?
- Q3:怎样让我们启用的多个 goroutine 按照既定的顺序执行 ?
Q1:用什么手段可以对 goroutine 的启用数量加以限制 ?
自己尝试用 buffer channel
实现了一个比较陋的Demo:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
/*
* 说明:用什么手段可以对 goroutine 的启用数量加以限制?
* 作者:zhe
* 时间:2019-01-17 20:12 PM
* 更新:比较陋的实现。。。
*/
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
// max worker
const (
taskNum = 1000000
defaultMaxWorker = 100
)
// wg waiting for all goroutine finished.
var wg = sync.WaitGroup{}
// task and worker
var w = newWorker(defaultMaxWorker)
var t = newTask(w)
func main() {
EnableGoPool()
}
func EnableGoPool() {
t.add(taskNum)
go t.watching()
go t.produce(taskNum)
t.wait()
}
// worker
type worker struct {
max uint64
pool chan struct{} // buffered chan, len(cap) <= max
}
// newWorker
// max = 0, 表明只有一个 worker
func newWorker(max uint64) *worker {
if max < 0 {
max = defaultMaxWorker
}
return &worker{
max: max,
pool: make(chan struct{}, max),
}
}
// done
func (w worker) done() {
<-w.pool
}
// task
type task struct {
worker *worker
}
// newTask
func newTask(w *worker) *task {
return &task{worker: w}
}
// add
func (t *task) add(num int) {
wg.Add(num)
}
// produce 产出任务, 任务数量不得小于 1
func (t *task) produce(num int) {
if num == 0 {
num = 1 // 至少安排一个任务
}
for i := 1; i <= num; i++ {
// 检查 pool 是否已经被沾满,如果已满则阻塞等待
// 空闲的 worker;当有 worker 被释放时,继续执
// 行后面的代码块,即分配新任务给空闲的 worker
t.worker.pool <- struct{}{}
go t.do(i) // do sth long-running
}
}
// wait
func (t *task) wait() {
wg.Wait()
}
// do
func (t task) do(i int) {
defer t.done()
// 模拟耗时操作
time.Sleep(time.Millisecond)
fmt.Printf("[task][%4v][%v] done.\n", i, utils.Now())
}
// done
func (t *task) done() {
t.worker.done()
wg.Done()
}
func (t *task) watching() {
for {
<-time.After(100 * time.Millisecond)
fmt.Printf("go: %v\n", runtime.NumGoroutine())
}
}
|
Q2:怎样才能让主 goroutine 等待其他 goroutine ?
先来看 Demo:
1
2
3
4
5
6
7
8
9
10
11
|
package main
import "fmt"
func main() {
for i:=0; i<10; i++ {
go func() {
fmt.Println(i)
}()
}
}
|
这段代码运行后是不会有任何输出的,至于原因在 上一节已经分析过了, 你可以回过头在看看。现在我们来看看有那些方式可以实现让主 goroutine 等待其他 goroutine
A1:小睡一会儿
最简单粗暴的方式就是让主 goroutine “小睡” 一会儿,来改下代码看看
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
package main
import (
"fmt"
"time"
)
func main() {
for i := 0; i < 10; i++ {
go func() {
fmt.Println(i)
}()
}
time.Sleep(time.Millisecond * 500) // + 小睡会儿:让主 goroutine 暂停运行,等待恢复后会继续执行后边的代码
}
|
这个办法是可行的,只要 “睡眠” 的时间不要太短就好。不过,问题恰恰就在这里,我们让主 goroutine “睡眠” 多长时间才合适呢? 如果睡眠太短,则很可能不足以让其他的 goroutine 运行完毕,而若 “睡眠” 太长则纯属浪费时间,这个时间就太难把握了。
既然是这样,那会不会有更好的实现方法呢? 当然是有的啊,可以让其他 goroutine 在运行完毕的时候告诉我们一下就 🆗 了啊,来在改改代码看看
A2:让其他 goroutine 在运行完毕后通知主 goroutine
这里,你是否想到了通道呢?通道的长度应该与我们手动启用的 goroutine 数量一致。在每个手动启用的 goroutine 即将运行结束的时候,我们都要向该通道发送一个值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
package main
import (
"fmt"
"time"
)
func main() {
n := 10
done := make(chan struct{}, n)
// 这里有个细节需要注意:
// 我们在声明通道 done 的时候是以 chan struct{} 作为类型的。其中
// 的类型字面量 struct{} 有些类似于空接口类型 interface{}, 它代
// 表了既不包含任何字段也不拥有任何方法的空结构体类型
//
// 注意:
// struct 类型值得表示法只有一个,即:struct{}{}。并且,它占用的
// 内存空间是 0 字节。 确切的说,这个值在整个 Go 程序中永远都只会
// 存在一份。虽然我们可以无数次地使用这个值字面量,但是用到的却都是
// 同一个值。
for i:=0; i<n; i++ {
go func() {
fmt.Println(i)
done <- struct{}{}
}()
}
for j:0; j<n; j++ {
<- done
}
}
|
Note:当我们仅仅把通道当作传递某种简单信号的介质的时候,用 struct{}
作为其元素类型是再好不过的了。
再看这个问题,想想有没有更好的答案? 可定也是有的,如果了解 sync
代码包的话,那么可能会想到 sync.WaitGroup
类型。 来看看代码实现
A3:使用 sync.WaitGroup
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
package main
import (
"fmt"
"sync"
)
func main() {
n := 10
wg := sync.WaitGroup{}
wg.Add(n)
for i:=0; i<n; i++ {
go func() {
fmt.Println(i)
wg.Done()
}()
}
wg.Wait()
}
|
Q3:怎样让我们启用的多个 goroutine 按照既定的顺序执行 ?
既然是按既定的顺序执行,那么肯定是要让异步发起的 go 函数得到同步的执行
Demo 实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
|
/*
* 说明:for 循环启用 10 个 goroutine 打印迭代变量的序号,怎么保证按自然数的顺序(0,1,2...) 输出
* 作者:zhe
* 时间:2019-01-15 10:10 PM
* 更新:
*/
package main
import (
"fmt"
"sync/atomic"
"time"
)
var cnt uint32
func main() {
TestByTriggerFn()
ImplementationByChan()
}
// ********************************************* 方式一:自旋(spinning)函数
func TestByTriggerFn() {
InputNumPassByTrigger()
trigger(10, func() {}) // 等待主 goroutine 结束
}
var trigger = func(i uint32, fn func()) {
for {
if n := atomic.LoadUint32(&cnt); n == i { // 自旋,直到 i 和 n 相等时
// 才执行 if 里代码,n 从 0 起计数
fn()
atomic.AddUint32(&cnt, 1) // cnt 会在多 goroutine 间会产生竞态,所以这里采用原子操作
break
}
time.Sleep(time.Nanosecond)
}
}
func InputNumPassByTrigger() {
for i := uint32(0); i < 10; i++ {
go func(i uint32) {
fn := func() { fmt.Println(i) }
trigger(i, fn)
}(i)
}
}
// ********************************************* 方式二:用通道实现
func ImplementationByChan() {
ch := make(chan int)
go InputNum(ch)
OutByOrder(ch)
close(ch)
}
func InputNum(ch chan int) {
for i := 0; i < 10; i++ {
go func(i int) {
ch <- i
}(i)
}
}
func OutByOrder(ch chan int) {
cnt := 0
for {
select {
case i := <-ch:
if i == cnt { // cnt 从 0 开始递增 => 从 goroutine 接收的值也从 0 递增
fmt.Println(i)
cnt += 1
continue
}
go func() { ch <- i }() // 不符合自然数顺序的重新放回通道中去
default:
if cnt == 10 { // cnt 计数到 10 说明接收完成,退出函数
return
}
}
}
}
|
See Also
Thanks to the authors 🙂