现在的应用程序或多或少都有这样得需求,既 在给定的某段时间重复执行一段代码。这段代码可能是从其他数据源获取数据进行解析,亦或是发送一些数据到其他地方,如 MQ、Handler 等。

这种问题或挑战一定会在你的项目中遇到,现在或将来,难道不是吗?

我们团队在前一个项目开发期间就遇到了这样的问题。随着应用程序不断迭代、某些功能上就需要在某段时间内能够持续的执行一个任务,当然,任务的执行频率是可以提前设置好的。因此,就有了下面提到的这个库,只有不到 50 行代码,用一种更优雅的方式来调度某种类型的任务。

我们来深入看看怎么实现的

其背后主要思想是:每个添加的任务都启动一个 goroutine,在指定的时间间隔内重复执行该任务, 并创建对应的 context.CancelFunc,以便能够在后台取消该 goroutine。

Scheduler 包对外表现的能力使用了 Scheduler 接口进行封装, 定义如下:

1
2
3
4
5
// Scheduler
type Scheduler interface {
	Add(ctx context.Context, job Job, interval time.Duration)
	Stop()
}

下面是 Schedule 的数据结构定义,它拥有的两个方法 Add()Stop()。Schedule 由 sync.WaitGroup() 和由 contex.CancelFunc 构成的切片组成。WaitGroup 用来在应用程序停止的时候平滑的关闭所有 goroutine。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// Schedule implements the Scheduler interface
type Schedule struct {
	wg        sync.WaitGroup
	cancelOps []context.CancelFunc
}

// NewSchedule create an instance of Schedule
func NewScheduler() Scheduler {
	s := Schedule{
		wg:        sync.WaitGroup{},
		cancelOps: make([]context.CancelFunc, 0),
	}
	return &s
}

Schedule 的两个方法实现如下,Stop 将会停止所有正在运行的 Job,并等待所有 goroutine 执行结束。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// Add starts a goroutine which constantly calls provided job
// with interval delay
func (s *Schedule) Add(ctx context.Context, job Job, interval time.Duration) {
	ctx, cancel := context.WithCancel(ctx)
	s.cancelOps = append(s.cancelOps, cancel)

	s.wg.Add(1)
	go s.process(ctx, job, interval)
}

// Stop cancels all running jobs
func (s *Schedule) Stop() {
	for _, cancel := range s.cancelOps {
		cancel()
	}
	s.wg.Wait()
}

Job 定义为函数类型,表示要执行的任务

1
2
// Job is a defined type which is just a simple func with context
type Job func(ctx context.Context)

当调用 Add() 方法添加一个任务时,启动一个 goroutine 来执行,该 goroutine 内部会监听两个信号:1. 打点器触发,执行任务;2. context.CancelFunc 触发,停止任务并退出。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// process executes the job by every ticker time, and stop job
// when context.Cancel called.
func (s *Schedule) process(ctx context.Context, job Job, interval time.Duration) {
	ticker := time.NewTicker(interval)

	for {
		select {
		case <-ticker.C:
			job(ctx)
		case <-ctx.Done():
			s.wg.Done()
			return
		}
	}
}

最后,切记在应用程序结束的时候要调用 Stop() 保证平滑退出。

See Also

Thanks to the authors 🙂