1、概述
Kubernetes 的各个组件都有一定的定时任务,比如任务的定时轮询、高可用的实现、日志处理、缓存使用等,Kubernetes 中的定时任务都是通过 wait 包实现的。
注意,本文源码基于Kubernetes 1.21.5。
2、Golang 的定时任务
在讲 Kubernetes 的 wait 包之前,先看下 Golang 应该怎么实现一个定时任务。
Golang 中的 time 库包含了很多和时间相关的工具,其中包括了 Ticker 和 Timer 两个定时器。
- Ticker 只要完成定义,从计时开始,不需要其他操作,每间隔固定时间便会触发。
- 而对于 Timer,在超时之后需要重置才能继续触发。
Golang定时任务详细语法细节请参考:Golang定时器——Timer 和 Ticker 。
3、Kubernetes 的 wait 库
3.1 常用 API
wait 库中实现了多种常用的 API,以提供定时执行函数的能力。
定期执行一个函数,永不停止
var NeverStop <-chan struct{} = make(chan struct{})
// Forever calls f every period for ever.
//
// Forever is syntactic sugar on top of Until.
func Forever(f func(), period time.Duration) {
Until(f, period, NeverStop)
}
该函数支持一个函数变量参数和一个间隔时间,该函数会定期执行,不会停止。
定期执行一个函数,可以接受停止信号
// Until loops until stop channel is closed, running f every period.
//
// Until is syntactic sugar on top of JitterUntil with zero jitter factor and
// with sliding = true (which means the timer for period starts after the f
// completes).
func Until(f func(), period time.Duration, stopCh <-chan struct{}) {
JitterUntil(f, period, 0.0, true, stopCh)
}
该函数支持提供一个函数变量参数、间隔时间和发生 stop 信号的 channel,和 Forever 类似,不过可以通过向 stopCh 发布消息来停止。
定期执行一个函数,通过context并发控制协程
func UntilWithContext(ctx context.Context, f func(context.Context), period time.Duration) {
JitterUntilWithContext(ctx, f, period, 0.0, true)
}
func JitterUntilWithContext(ctx context.Context, f func(context.Context), period time.Duration, jitterFactor float64, sliding bool) {
JitterUntil(func() { f(ctx) }, period, jitterFactor, sliding, ctx.Done())
}
该函数支持提供一个context、函数变量参数和间隔时间,和 Util 类似,只不过通过context并发控制协程。
定期检查先决条件
// Poll tries a condition func until it returns true, an error, or the timeout
// is reached.
//
// Poll always waits the interval before the run of 'condition'.
// 'condition' will always be invoked at least once.
//
// Some intervals may be missed if the condition takes too long or the time
// window is too short.
//
// If you want to Poll something forever, see PollInfinite.
func Poll(interval, timeout time.Duration, condition ConditionFunc) error
该函数将以 interval 为间隔,定期检查 condition 是否检查成功。
3.2 核心代码
wait 包的定时任务 API 是基于 JitterUntil
实现的。
JitterUntil
的 5 个参数:
参数名 |
类型 |
作用 |
f |
|
需要定时执行的逻辑函数 |
period |
|
定时任务的时间间隔 |
jitterFactor |
|
如果大于 0.0,间隔时间变为 duration 到 duration + maxFactor * duration 的随机值(其中如果jitterFactor值大于0,那么maxFactor=jitterFactor)。 |
sliding |
|
逻辑的执行时间是否不算入间隔时间,如果 sliding 为 true,则在 f() 运行之后计算周期。如果为 false,那么 period 包含 f() 的执行时间。 |
stopCh |
|
接受停止信号的 channel |
func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) {
BackoffUntil(f, NewJitteredBackoffManager(period, jitterFactor, &clock.RealClock{}), sliding, stopCh)
}
//周期性执行函数f
func BackoffUntil(f func(), backoff BackoffManager, sliding bool, stopCh <-chan struct{}) {
var t clock.Timer
for {
//通道关闭后,退出BackoffUtil函数(关闭通道后读取对应类型零值)
select {
case <-stopCh:
return
default:
}
//实例化(NewTimer)或复用(Reset)原生定时器time.Timer进行周期性任务
if !sliding {
t = backoff.Backoff()
}
func() {
defer runtime.HandleCrash()
f()
}()
//实例化(NewTimer)或复用(Reset)原生定时器time.Timer进行周期性任务
if sliding {
t = backoff.Backoff()
}
//每隔getNextBackoff()时间间隔触发定时器
select {
case <-stopCh:
return
case <-t.C():
}
}
}
type jitteredBackoffManagerImpl struct {
clock clock.Clock
duration time.Duration
jitter float64
backoffTimer clock.Timer
}
// NewJitteredBackoffManager returns a BackoffManager that backoffs with given duration plus given jitter. If the jitter
// is negative, backoff will not be jittered.
func NewJitteredBackoffManager(duration time.Duration, jitter float64, c clock.Clock) BackoffManager {
return &jitteredBackoffManagerImpl{
clock: c,
duration: duration, //最少要延迟多久
jitter: jitter, //给定抖动范围
backoffTimer: nil, //退避计时器,一开始不需要初始化backoffTimer,会由使用者调用Backoff方法时由计算后再赋值
}
}
func (j *jitteredBackoffManagerImpl) Backoff() clock.Timer {
backoff := j.getNextBackoff()
//实例化原生定时器time.Timer进行周期性任务
if j.backoffTimer == nil {
j.backoffTimer = j.clock.NewTimer(backoff)
} else {
//复用timer
j.backoffTimer.Reset(backoff)
}
return j.backoffTimer
}
//计算延迟时间
func (j *jitteredBackoffManagerImpl) getNextBackoff() time.Duration {
jitteredPeriod := j.duration
if j.jitter > 0.0 {
jitteredPeriod = Jitter(j.duration, j.jitter)
}
return jitteredPeriod
}
//计算抖动延迟时间
func Jitter(duration time.Duration, maxFactor float64) time.Duration {
if maxFactor <= 0.0 {
maxFactor = 1.0
}
//抖动延迟时间 = 基础的延时时间 + 随机时间*抖动因子*基础的延时时间
wait := duration + time.Duration(rand.Float64()*maxFactor*float64(duration))
return wait
}
//clock包进行对time.Timer做了一层封装实现,本文只列一下time.Timer实例化方法
type RealClock struct{}
// NewTimer returns a new Timer.
func (RealClock) NewTimer(d time.Duration) Timer {
return &realTimer{
timer: time.NewTimer(d),
}
}