• Q1:用什么手段可以对 goroutine 的启用数量加以限制 ?
  • Q2:怎样才能让主 goroutine 等待其他 goroutine ?
  • Q3:怎样让我们启用的多个 goroutine 按照既定的顺序执行 ?

Q1:用什么手段可以对 goroutine 的启用数量加以限制 ?

自己尝试用 buffer channel 实现了一个比较陋的Demo:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
/*
 * 说明:用什么手段可以对 goroutine 的启用数量加以限制?
 * 作者:zhe
 * 时间:2019-01-17 20:12 PM
 * 更新:比较陋的实现。。。
 */

package main

import (
	"fmt"
	"runtime"
	"sync"
	"time"
)

// max worker
const (
	taskNum          = 1000000
	defaultMaxWorker = 100
)

// wg waiting for all goroutine finished.
var wg = sync.WaitGroup{}

// task and worker
var w = newWorker(defaultMaxWorker)
var t = newTask(w)

func main() {
	EnableGoPool()
}

func EnableGoPool() {
	t.add(taskNum)

	go t.watching()
	go t.produce(taskNum)

	t.wait()
}

// worker
type worker struct {
	max  uint64
	pool chan struct{} // buffered chan, len(cap) <= max
}

// newWorker
// max = 0, 表明只有一个 worker
func newWorker(max uint64) *worker {
	if max < 0 {
		max = defaultMaxWorker
	}
	return &worker{
		max:  max,
		pool: make(chan struct{}, max),
	}
}

// done
func (w worker) done() {
	<-w.pool
}

// task
type task struct {
	worker *worker
}

// newTask
func newTask(w *worker) *task {
	return &task{worker: w}
}

// add
func (t *task) add(num int) {
	wg.Add(num)
}

// produce 产出任务, 任务数量不得小于 1
func (t *task) produce(num int) {
	if num == 0 {
		num = 1 // 至少安排一个任务
	}

	for i := 1; i <= num; i++ {
		// 检查 pool 是否已经被沾满,如果已满则阻塞等待
		// 空闲的 worker;当有 worker 被释放时,继续执
		// 行后面的代码块,即分配新任务给空闲的 worker
		t.worker.pool <- struct{}{}

		go t.do(i) // do sth long-running
	}
}

// wait
func (t *task) wait() {
	wg.Wait()
}

// do
func (t task) do(i int) {
	defer t.done()

	// 模拟耗时操作
	time.Sleep(time.Millisecond)
	fmt.Printf("[task][%4v][%v] done.\n", i, utils.Now())
}

// done
func (t *task) done() {
	t.worker.done()
	wg.Done()
}

func (t *task) watching() {
	for {
		<-time.After(100 * time.Millisecond)
		fmt.Printf("go: %v\n", runtime.NumGoroutine())
	}
}

Q2:怎样才能让主 goroutine 等待其他 goroutine ?

先来看 Demo:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
package main

import "fmt"

func main() {
	for i:=0; i<10; i++ {
		go func() {
			fmt.Println(i)
		}()
	}
}

这段代码运行后是不会有任何输出的,至于原因在 上一节已经分析过了, 你可以回过头在看看。现在我们来看看有那些方式可以实现让主 goroutine 等待其他 goroutine

A1:小睡一会儿

最简单粗暴的方式就是让主 goroutine “小睡” 一会儿,来改下代码看看

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
package main

import (
	"fmt"
	"time"
)

func main() {
	for i := 0; i < 10; i++ {
		go func() {
			fmt.Println(i)
		}()
	}
	time.Sleep(time.Millisecond * 500) // + 小睡会儿:让主 goroutine 暂停运行,等待恢复后会继续执行后边的代码
}

这个办法是可行的,只要 “睡眠” 的时间不要太短就好。不过,问题恰恰就在这里,我们让主 goroutine “睡眠” 多长时间才合适呢? 如果睡眠太短,则很可能不足以让其他的 goroutine 运行完毕,而若 “睡眠” 太长则纯属浪费时间,这个时间就太难把握了。

既然是这样,那会不会有更好的实现方法呢? 当然是有的啊,可以让其他 goroutine 在运行完毕的时候告诉我们一下就 🆗 了啊,来在改改代码看看

A2:让其他 goroutine 在运行完毕后通知主 goroutine

这里,你是否想到了通道呢?通道的长度应该与我们手动启用的 goroutine 数量一致。在每个手动启用的 goroutine 即将运行结束的时候,我们都要向该通道发送一个值。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
    "fmt"
    "time"
)

func main() {
    n := 10
    done := make(chan struct{}, n)
    // 这里有个细节需要注意:
    // 我们在声明通道 done 的时候是以 chan struct{} 作为类型的。其中
    // 的类型字面量 struct{} 有些类似于空接口类型 interface{}, 它代
    // 表了既不包含任何字段也不拥有任何方法的空结构体类型
    //
    // 注意:
    // struct 类型值得表示法只有一个,即:struct{}{}。并且,它占用的
    // 内存空间是 0 字节。 确切的说,这个值在整个 Go 程序中永远都只会
    // 存在一份。虽然我们可以无数次地使用这个值字面量,但是用到的却都是
    // 同一个值。


    for i:=0; i<n; i++ {
        go func() {
            fmt.Println(i)
            done <- struct{}{}
        }()
    }

    for j:0; j<n; j++ {
        <- done
    }
}

Note:当我们仅仅把通道当作传递某种简单信号的介质的时候,用 struct{} 作为其元素类型是再好不过的了。

再看这个问题,想想有没有更好的答案? 可定也是有的,如果了解 sync 代码包的话,那么可能会想到 sync.WaitGroup 类型。 来看看代码实现

A3:使用 sync.WaitGroup

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

import (
    "fmt"
    "sync"
)

func main() {
    n := 10

    wg := sync.WaitGroup{}
    wg.Add(n)

    for i:=0; i<n; i++ {
        go func() {
            fmt.Println(i)
            wg.Done()
        }()
    }

    wg.Wait()
}

Q3:怎样让我们启用的多个 goroutine 按照既定的顺序执行 ?

既然是按既定的顺序执行,那么肯定是要让异步发起的 go 函数得到同步的执行

Demo 实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/*
 * 说明:for 循环启用 10 个 goroutine 打印迭代变量的序号,怎么保证按自然数的顺序(0,1,2...) 输出
 * 作者:zhe
 * 时间:2019-01-15 10:10 PM
 * 更新:
 */

package main

import (
	"fmt"
	"sync/atomic"
	"time"
)

var cnt uint32

func main() {
	TestByTriggerFn()
	ImplementationByChan()
}

// ********************************************* 方式一:自旋(spinning)函数
func TestByTriggerFn() {
	InputNumPassByTrigger()
	trigger(10, func() {}) // 等待主 goroutine 结束
}

var trigger = func(i uint32, fn func()) {
	for {
		if n := atomic.LoadUint32(&cnt); n == i { // 自旋,直到 i 和 n 相等时
			// 才执行 if 里代码,n 从 0 起计数
			fn()
			atomic.AddUint32(&cnt, 1) // cnt 会在多 goroutine 间会产生竞态,所以这里采用原子操作
			break
		}
		time.Sleep(time.Nanosecond)
	}
}

func InputNumPassByTrigger() {
	for i := uint32(0); i < 10; i++ {
		go func(i uint32) {
			fn := func() { fmt.Println(i) }
			trigger(i, fn)
		}(i)
	}
}

// ********************************************* 方式二:用通道实现
func ImplementationByChan() {
	ch := make(chan int)
	go InputNum(ch)

	OutByOrder(ch)
	close(ch)
}

func InputNum(ch chan int) {
	for i := 0; i < 10; i++ {
		go func(i int) {
			ch <- i
		}(i)
	}
}

func OutByOrder(ch chan int) {
	cnt := 0
	for {
		select {
		case i := <-ch:
			if i == cnt { // cnt 从 0 开始递增 => 从 goroutine 接收的值也从 0 递增
				fmt.Println(i)
				cnt += 1
				continue
			}
			go func() { ch <- i }() // 不符合自然数顺序的重新放回通道中去
		default:
			if cnt == 10 { // cnt 计数到 10 说明接收完成,退出函数
				return
			}
		}
	}
}

See Also

Thanks to the authors 🙂