现在的应用程序或多或少都有这样得需求,既 在给定的某段时间重复执行一段代码
。这段代码可能是从其他数据源获取数据进行解析,亦或是发送一些数据到其他地方,如 MQ、Handler 等。
这种问题或挑战一定会在你的项目中遇到,现在或将来,难道不是吗?
我们团队在前一个项目开发期间就遇到了这样的问题。随着应用程序不断迭代、某些功能上就需要在某段时间内能够持续的执行一个任务,当然,任务的执行频率是可以提前设置好的。因此,就有了下面提到的这个库,只有不到 50 行代码,用一种更优雅的方式来调度某种类型的任务。
我们来深入看看怎么实现的
其背后主要思想是:每个添加的任务都启动一个 goroutine
,在指定的时间间隔内重复执行该任务, 并创建对应的 context.CancelFunc
,以便能够在后台取消该 goroutine。
Scheduler 包对外表现的能力使用了 Scheduler 接口进行封装, 定义如下:
1
2
3
4
5
|
// Scheduler
type Scheduler interface {
Add(ctx context.Context, job Job, interval time.Duration)
Stop()
}
|
下面是 Schedule
的数据结构定义,它拥有的两个方法 Add()
和 Stop()
。Schedule 由 sync.WaitGroup()
和由 contex.CancelFunc
构成的切片组成。WaitGroup 用来在应用程序停止的时候平滑的关闭所有 goroutine。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
// Schedule implements the Scheduler interface
type Schedule struct {
wg sync.WaitGroup
cancelOps []context.CancelFunc
}
// NewSchedule create an instance of Schedule
func NewScheduler() Scheduler {
s := Schedule{
wg: sync.WaitGroup{},
cancelOps: make([]context.CancelFunc, 0),
}
return &s
}
|
Schedule 的两个方法实现如下,Stop 将会停止所有正在运行的 Job,并等待所有 goroutine 执行结束。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
// Add starts a goroutine which constantly calls provided job
// with interval delay
func (s *Schedule) Add(ctx context.Context, job Job, interval time.Duration) {
ctx, cancel := context.WithCancel(ctx)
s.cancelOps = append(s.cancelOps, cancel)
s.wg.Add(1)
go s.process(ctx, job, interval)
}
// Stop cancels all running jobs
func (s *Schedule) Stop() {
for _, cancel := range s.cancelOps {
cancel()
}
s.wg.Wait()
}
|
Job 定义为函数类型,表示要执行的任务
1
2
|
// Job is a defined type which is just a simple func with context
type Job func(ctx context.Context)
|
当调用 Add() 方法添加一个任务时,启动一个 goroutine 来执行,该 goroutine 内部会监听两个信号:1. 打点器触发,执行任务;2. context.CancelFunc 触发,停止任务并退出。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
// process executes the job by every ticker time, and stop job
// when context.Cancel called.
func (s *Schedule) process(ctx context.Context, job Job, interval time.Duration) {
ticker := time.NewTicker(interval)
for {
select {
case <-ticker.C:
job(ctx)
case <-ctx.Done():
s.wg.Done()
return
}
}
}
|
最后,切记在应用程序结束的时候要调用 Stop()
保证平滑退出。
See Also
Thanks to the authors 🙂