# kubernetes中定时任务的实现 ## k8s中定时任务的实现 k8s 中有许多优秀的包都可以在平时的开发中借鉴与使用,比如,任务的定时轮询、高可用的实现、日志处理、缓存使用等都是独立的包,可以直接引用。本篇文章会介绍 k8s 中定时任务的实现,k8s 中定时任务都是通过 wait 包实现的,wait 包在 k8s 的多个组件中都有用到,以下是 wait 包在 kubelet 中的几处使用: ``` func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{}) (err error) { ... // kubelet 每5分钟一次从 apiserver 获取证书 closeAllConns, err := kubeletcertificate.UpdateTransport(wait.NeverStop, clientConfig, clientCertificateManager, 5*time.Minute) if err != nil { return err } closeAllConns, err := kubeletcertificate.UpdateTransport(wait.NeverStop, clientConfig, clientCertificateManager, 5*time.Minute) if err != nil { return err } ... } ... func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) { // 持续监听 pod 的变化 go wait.Until(func() { k.Run(podCfg.Updates()) }, 0, wait.NeverStop) ... } ``` golang 中可以通过 time.Ticker 实现定时任务的执行,但在 k8s 中用了更原生的方式,使用 time.Timer 实现的。time.Ticker 和 time.Timer 的使用区别如下: - ticker 只要定义完成,从此刻开始计时,不需要任何其他的操作,每隔固定时间都会自动触发。 - timer 定时器是到了固定时间后会执行一次,仅执行一次 - 如果 timer 定时器要每隔间隔的时间执行,实现 ticker 的效果,使用 `func (t *Timer) Reset(d Duration) bool` > 示例1 试验原生time.Ticker 以及time.Timer ``` func main(){ var wg sync.WaitGroup ch := make(chan struct{}) timer1 := time.NewTimer(2 * time.Second) ticker1 := time.NewTicker(2 * time.Second) go func() { fmt.Println("sleep 10s") fmt.Println("sleep 时间",time.Now().Format("2006-01-02 15:04:05")) time.Sleep(10 * time.Second) close(ch) }() wg.Add(1) go func(t *time.Ticker) { defer wg.Done() for { select { case _,isClose := <- ch: if !isClose{ return } default: } <- t.C fmt.Println("exec ticker",time.Now().Format("2006-01-02 15:04:05")) } }(ticker1) wg.Add(1) go func(t *time.Timer) { defer wg.Done() for { select { case _,isClose := <- ch: if !isClose{ return } default: } <- t.C fmt.Println("exec timer",time.Now().Format("2006-01-02 15:04:05")) t.Reset(2 * time.Second) //重置计时器,只有重置了,timer类型才能重新在t.C取到值,否则这个循环只会执行一次,这里不会阻塞,所以循环会多输出一次 } }(timer1) wg.Wait() } 输出结果 sleep 10s sleep 时间 2021-08-19 11:08:42 exec ticker 2021-08-19 11:08:44 exec timer 2021-08-19 11:08:44 exec timer 2021-08-19 11:08:46 exec ticker 2021-08-19 11:08:46 exec ticker 2021-08-19 11:08:48 exec timer 2021-08-19 11:08:48 exec ticker 2021-08-19 11:08:50 exec timer 2021-08-19 11:08:50 exec ticker 2021-08-19 11:08:52 exec timer 2021-08-19 11:08:52 ``` ### Wait包介绍 [wait](https://pkg.go.dev/k8s.io/apimachinery/pkg/util/wait) 包提供了通过轮询或者监听一个条件的修改(关闭channel, ctx.Done,...)来执行指定函数的工具函数. 这些函数可以分为四大类 - [Until 类](http://icebergu.com/archives/client-go-wait#until-类监听-channel-或者-context): 根据 channel 的关闭或者 context Done 的信号来结束对指定函数的轮询操作 - [Poll 类](http://icebergu.com/archives/client-go-wait#poll-类型会根据-conditionfunc-函数来决定是否退出循环):不只是会根据 channel 或者 context 来决定结束轮询,还会判断轮询函数的返回值来决定是否结束 - [Wait 类](http://icebergu.com/archives/client-go-wait#wait-类会根据-waitfunc-来决定什么是否执行-conditionfunc): 会根据 WaitFor 函数返回的 channel 来触发函数执行 - [Backoff 类](http://icebergu.com/archives/client-go-wait#backoff-类根据-backoff-来计算函数执行的时间间隔):会根据 Backoff 返回的时间间隔来循环触发函数的执行 在介绍具体函数前,介绍一下用于对轮询的时间间隔进行抖动干扰的函数 `Jitter` ``` func Jitter(duration time.Duration, maxFactor float64) time.Duration { if maxFactor <= 0.0 { maxFactor = 1.0 } return duration + time.Duration(rand.Float64()*maxFactor*float64(duration)) } ``` `Jitter` 会返回一个 `[duration, (duration+maxFactor*duration) )` 区间的时间间隔 ### Untill类 监听channel或者context **`until 类`** 的函数参数都具有相同的意义 - `f` 表示具体执行的函数 - `period` 表示轮询的时间间隔 - `jitterFactor` 表示对轮询时间间隔加一个抖动 `Jitter(period, jitterFactor)` - `sliding` 为 True 表示`f() 运行之后计算周期`,False 表示 period 包含 f() 的执行时间 - `stopCh` 表示当 `stopCh` 被关闭则结束轮询 - `ctx` 表示当 ctx.Done 是结束函数的轮询 | 函数 | 作用 | | :----------------------------------------------------------- | :----------------------------------------------------- | | func **`JitterUntil`**(`f` func(), `period` time.Duration, `jitterFactor` float64, `sliding` bool, `stopCh` <-chan struct{}) | | | func **`JitterUntilWithContext`**(`ctx` context.Context, `f` func(context.Context), `period` time.Duration, `jitterFactor` float64, `sliding` bool) | | | func **`Until`**(`f` func(), `period` time.Duration, `stopCh` <-chan struct{}) | 相当于 `sliding` 为 True ,无抖动的 `JitterUntil` | | func **`NoSlidingUntil`**(`f` func(), `period` time.Duration, `stopCh` <-chan struct{}) | `sliding` 为 False | | func **`UntilWithContext`**(`ctx` context.Context, `f` func(context.Context), `period` time.Duration) | | | func **`NoSlidingUntilWithContext`**(ctx context.Context, f func(context.Context), period time.Duration) | | | func **`Forever`**(`f` func(), `period` time.Duration) | 永久轮询执行 `f`,相当于 `Until(f, period, NeverStop)` | 根据函数名称可以一些规律: `Jitter*` 带时间抖动参数 `jitterFactor` `NoSliding*` period 包含 f() 的执行时间 `Until*` 默认从`f() 运行之后计算周期` 执行开始计算下一次的执行时间间隔 #### 核心代码 核心代码(k8s.io/apimachinery/pkg/util/wait/wait.go): ``` func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) { var t *time.Timer var sawTimeout bool for { select { case <-stopCh: return default: } jitteredPeriod := period if jitterFactor > 0.0 { jitteredPeriod = Jitter(period, jitterFactor) } if !sliding { t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout) } func() { defer runtime.HandleCrash() f() }() if sliding { t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout) } select { case <-stopCh: return case <-t.C: sawTimeout = true } } } ... func resetOrReuseTimer(t *time.Timer, d time.Duration, sawTimeout bool) *time.Timer { if t == nil { return time.NewTimer(d) } if !t.Stop() && !sawTimeout { <-t.C } t.Reset(d) return t } ``` 几个关键点的说明: - 1、如果 sliding 为 true,则在 f() 运行之后计算周期。如果为 false,那么 period 包含 f() 的执行时间。 - 2、在 golang 中 select 没有优先级选择,为了避免额外执行 f(),在每次循环开始后会先判断 stopCh chan。 k8s 中 wait 包其实是对 time.Timer 做了一层封装实现。 #### 常用的方法 1、定期执行一个函数,永不停止,可以使用 Forever 方法: `func Forever(f func(), period time.Duration)` 2、在需要的时候停止循环,那么可以使用下面的方法,增加一个用于停止的 chan 即可,方法定义如下: ``` func Until(f func(), period time.Duration, stopCh <-chan struct{}) { JitterUntil(f, period, 0.0, true, stopCh) } ``` 上面的第三个参数 stopCh 就是用于退出无限循环的标志,停止的时候我们 close 掉这个 chan 就可以了。 3、 有时候我们需要周期的计时器在f函数启动之前开始计时,知道stopCh接收到信号或者被关闭,跟until唯一的区别就是sliding参数传入值的是false ``` func NonSlidingUntil(f func(), period time.Duration, stopCh <-chan struct{}) { JitterUntil(f, period, 0.0, false, stopCh) } ``` > 示例1 使用kubernetes源码中wait包的until函数 ``` 下载wait库 C:\Users\xx\Desktop\git\waittest>go get k8s.io/apimachinery/pkg/util/wait //示例2 使用kubernetes源码中wait包的until函数 func main() { ch := make(chan struct{}) go func() { log.Println("sleep 1s") time.Sleep(1 * time.Second) close(ch) }() wait.Until(func() { time.Sleep(100 * time.Millisecond) log.Println("test") },100 * time.Millisecond,ch) log.Println("main exit") } 输出结果: C:\Users\xx\Desktop\git\waittest>go run main.go 2021/08/19 10:03:34 sleep 1s 2021/08/19 10:03:34 test 2021/08/19 10:03:34 test 2021/08/19 10:03:34 test 2021/08/19 10:03:34 test 2021/08/19 10:03:35 test 2021/08/19 10:03:35 main exit ``` > 示例2 ``` type Pod struct { ID int Name string } func main(){ podKillCh := make(chan *Pod,50) go func() { i := 0 for { time.Sleep(2 * time.Second) podKillCh <- &Pod{ ID: i, Name: strconv.Itoa(i), } i++ } }() wait.Until(func() { for pod := range podKillCh{ log.Printf("%#v\n",pod) } },1 ,wait.NeverStop) //wait.NeverStop 表示永不停止 ,与下面代码效果一致 //wait.Forever(func() { // for pod := range podKillCh{ // log.Printf("%#v\n",pod) // } //},1 ) log.Println("main exit") } 输出: C:\Users\xx\Desktop\git\waittest>go run main.go 2021/08/19 10:24:04 &main.Pod{ID:0, Name:"0"} 2021/08/19 10:24:06 &main.Pod{ID:1, Name:"1"} 2021/08/19 10:24:08 &main.Pod{ID:2, Name:"2"} 2021/08/19 10:24:10 &main.Pod{ID:3, Name:"3"} 2021/08/19 10:24:12 &main.Pod{ID:4, Name:"4"} 2021/08/19 10:24:14 &main.Pod{ID:5, Name:"5"} 在func执行期间(遍历podKillCh,空channel会阻塞),不会重新计时再次执行func。 ``` ### Poll 类型 会根据 ConditionFunc 函数来决定是否退出循环 ``` type ConditionFunc func()(done bool, err error) ``` **当调用 `ConditionFunc` 返回 `true` 或者 `error` 时,结束对 `ConditionFunc` 的轮询,并且如果 `ConditionFunc` 返回了 error 那么`Poll*` 也会返回该错误** Poll 类型的轮询函数同样具有一些相同意义的参数: - `interval` 为执行 `ConditionFunc` 的时间间隔 - `timeout` 表示 Poll 类型函数的执行超时时间,如果在 `timeout` 时间内 `ConditionFunc` 都没有返回 `true` 或者 `error` 时,返回 `ErrWaitTimeout` - 内部是使用的time.Ticker对象 - 最终调用的都是核心函数poll,在执行`ConditionFunc` 之前执行还是在之后执行,看poll参数`immediate`是true还是false,如果是true,那么就先执行,然后再等待`interval`秒 | 函数 | 作用 | | :----------------------------------------------------------- | :-------------------------------------------------- | | func **`Poll`**(`interval`, `timeout` time.Duration, `condition` ConditionFunc) error | 在执行 `ConditionFunc` 之前会等待 `interval` 的时间 | | func **`PollUntil`**(`interval` time.Duration, `condition` ConditionFunc, `stopCh` <-chan struct{}) error | 监听 `stopCh` 的 `Poll` | | func **`PollImmediate`**(`interval`, `timeount` time.Duration, `condition` ConditionFunc) error | 执行 `ConditionFunc` 之后会等待 `interval` 的时间 | | func **`PollImmediateInfinite`**(`interval` time.Duration, `condition` ConditionFunc) error | 没有过期时间的 `PollImmediate` | | func **`PollImmediateUntil`**(`interval` time.Duration, `condition` ConditionFunc, `stopCh` <- chan struct{}) error | 监听 `stopCh` 是否关闭 的 `PollImmediate` | `Poll 和 ` `PollImmediate` 两种函数的区别是: `PollImmediate` 会先执行 `ConditionFunc` ,然后在等待 `interval` 的时间,而 `Poll 会先等待 `interval #### 核心代码 ``` func poll(ctx context.Context, immediate bool, wait WaitWithContextFunc, condition ConditionWithContextFunc) error { if immediate { done, err := runConditionWithCrashProtectionWithContext(ctx, condition) if err != nil { return err } if done { return nil } } select { case <-ctx.Done(): // returning ctx.Err() will break backward compatibility return ErrWaitTimeout default: return WaitForWithContext(ctx, wait, condition) } } ``` #### 常用方法 1、有时候,我们还会需要在运行前去检查先决条件,在条件满足的时候才去运行某一任务,这时候可以使用 Poll 方法: `func Poll(interval, timeout time.Duration, condition ConditionFunc)` 这个函数会以 interval 为间隔,不断去检查 condition 条件是否为真,如果为真则可以继续后续处理;如果指定了 timeout 参数,则该函数也可以只常识指定的时间。 2、PollUntil 方法和上面的类似,但是没有 timeout 参数,多了一个 stopCh 参数,如下所示: `PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error` 此外还有 PollImmediate 、 PollInfinite 和 PollImmediateInfinite 方法。 ### Wait类 会根据WaitFunc来决定是否执行ConditionFunc ``` // WaitFunc creates a channel that receives an item every time a test // should be executed and is closed when the last test should be invoked. type WaitFunc func(done <-chan struct{}) <-chan struct{} func WaitFor(wait WaitFunc, fn ConditionFunc, done <-chan struct{}) error { ctx, cancel := contextForChannel(done) defer cancel() return WaitForWithContext(ctx, wait.WithContext(), fn.WithContext()) } ``` `WaitFor` 从 `WaitFunc` 中获取一个 channel,当 channel 可读是执行 `fn`,然后检查 channel 是否被关闭,如果关闭则退出函数 **即使 channel 的关闭操作也会触发一次 fn 执行** 当 `done` 关闭时,直接返回 ErrorWaitTimeout,不过这个时候需要注意: **由于 select 是随机选择的,即使 `done` 被关闭了,`fn` 在 channel 可读的情况下依然可能会执行一次或者 n 次** **在 WaitFunc 中关闭返回的 channel 会效果更好** #### 核心代码 ``` func WaitForWithContext(ctx context.Context, wait WaitWithContextFunc, fn ConditionWithContextFunc) error { waitCtx, cancel := context.WithCancel(context.Background()) defer cancel() c := wait(waitCtx) for { select { case _, open := <-c: ok, err := runConditionWithCrashProtectionWithContext(ctx, fn) if err != nil { return err } if ok { return nil } if !open { return ErrWaitTimeout } case <-ctx.Done(): // returning ctx.Err() will break backward compatibility return ErrWaitTimeout } } } ``` #### poll函数与WaitForWithContext Poll 函数实际底层便是调用 WaitForWithContext,具体看poll类型核心代码部分 ### Backoff类 根据Backoff来计算函数执行的时间间隔 Backoff 类型的函数会根据 `Backoff` 或者 `BackoffManager` 返回的时间间隔来决定函数的执行间隔 ``` type Backoff struct { // 时间间隔,用于调用 Step 方法时返回的时间间隔. Duration time.Duration // 用于计算下次的时间间隔 // Factor 大于 0 时,Backoff 在计算下次的时间间隔时都会根据 Duration * Factor,Factor * Duration 不能大于 Cap // 不能为负数 Factor float64 // Jitter > 0 时,每次迭代的时间间隔都会额外加上 0 - Duration * Jitter 的随机时间,并且抖动出的时间不会设置为 Duration,而且不受 Caps 的限制 Jitter float64 // 进行指数回退(*Factor) 操作的次数,当 Factor * Duration > Cap 时 Steps 会被设置为 0, Duration 设置为 Cap // 也就是说后续的迭代时间间隔都会返回 Duration Steps int // 最大的时间间隔 Cap time.Duration } ``` `Backoff` 的 `Step` 方法返回通过 `Factor`, `Cap`, `Jitter` 计算的时间间隔 ``` func (b *Backoff) Step() time.Duration { //steps < 1 不再进行指数回退操作 if b.Steps < 1 { if b.Jitter > 0 { return Jitter(b.Duration, b.Jitter) } return b.Duration } b.Steps-- duration := b.Duration // 如果Factor 等于 0,那就不需要修改Duration if b.Factor != 0 { b.Duration = time.Duration(float64(b.Duration) * b.Factor) if b.Cap > 0 && b.Duration > b.Cap { b.Duration = b.Cap b.Steps = 0 } } if b.Jitter > 0 { duration = Jitter(duration, b.Jitter) } return duration } ``` `BackoffManger` 定义了 `Backoff` 方法,返回一个定时器,用于触发函数的执行 ``` type BackoffManager interface { Backoff() clock.Timer } func NewExponentialBackoffManager(initBackoff, maxBackoff, resetDuration time.Duration, backoffFactor, jitter float64, c clock.Clock) BackoffManager ``` NewExponentialBackoffManager会使用 `initBackoff`, `maxBackoff`, `backoffFactor`, `jitter` 来初始化一个 `Backoff` 对象 它的 Backoff 方法时间返回了一个使用 `Backoff.Step` 作为时间间隔的定时器 **不过两次 `Backoff.Step` 的时间间隔不能大于 `resetDuration` 否则会重置 `Backoff`** ``` func (b *expontialBackoffManagerImpl) getNextBackoff() time.Duration { if b.clock.Now().Sub(b.lastBackoffStart) > b.backResetDuration { b.backoff.Steps = math.MaxInt32 b.backoff.Duration = b.initialBackoff } b.lastBackoffStart = b.clock.Now() return b.backoff.Step() } ``` 也就是说如果 `backoff.Step()` 返回的值大于 `resetDuration`,那么下次返回的时间间隔便是 `initBackoff` **通常用于上游状态异常用来较少负载** 另外一种 `BackoffManager` 是 `JitterBackoffManager`,返回的是根据 jitter 抖动的时间间隔 ``` func NewJitteredBackoffManager(duration time.Duration, jitter float64, c clock.Clock) BackoffManager func (j *jitteredBackoffManagerImpl) getNextBackoff() time.Duration { if j.jitter > 0.0{ // 计算出来的抖动的时间间隔并不会影响 j.duration return Jitter(j.duration, j.jitter) } return j.duration } ``` #### Backoff类轮询函数 | 函数 | 作用 | | :----------------------------------------------------------- | :------------------------------------------------------ | | func **`BackUntil`**(`f` func(), `backoff` BackoffManager, `sliding` book, `stopCh` <-chan struct{}) | 根据 Backoff 的定时器来循环触发 `f`,直到 `stopCh` 关闭 | | func **`ExponentialBackoff`**(`backoff` Backoff, `condition` ConditionFunc) error | 根据 `backoff` 的时间间隔来循环执行 `f` | | func **`ExponentialBackoffWithContext`**(`ctx` context.Context, `backoff` Backoff, `condition` ConditionFunc) error | | `ExponentialBackoff*` 函数中 `condition` 会执行 `backoff.Steps` 次,到达次数并且 `condition` 没有返回 True 或 error,那么就返回 ErrWaitTimeout 如果 `backoff.Steps` 为 0 会直接返回 `ErrWaitTimeout` ``` func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error { for backoff.Steps > 0 { if ok ,err := condition(); err != nil || ok { return err } if backoff.Steps == 1 { break } time.Sleep(backoff.Step()) } return ErrWaitTimeout } ```