目录

kubernetes中定时任务的实现

k8s中定时任务的实现

k8s 中有许多优秀的包都可以在平时的开发中借鉴与使用,比如,任务的定时轮询、高可用的实现、日志处理、缓存使用等都是独立的包,可以直接引用。本篇文章会介绍 k8s 中定时任务的实现,k8s 中定时任务都是通过 wait 包实现的,wait 包在 k8s 的多个组件中都有用到,以下是 wait 包在 kubelet 中的几处使用:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{}) (err error) {
        ...
        // kubelet 每5分钟一次从 apiserver 获取证书
        closeAllConns, err := kubeletcertificate.UpdateTransport(wait.NeverStop, clientConfig, clientCertificateManager, 5*time.Minute)
        if err != nil {
            return err
        }

        closeAllConns, err := kubeletcertificate.UpdateTransport(wait.NeverStop, clientConfig, clientCertificateManager, 5*time.Minute)
        if err != nil {
            return err
        }
        ...
}

...

func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration,   kubeDeps *kubelet.Dependencies, enableServer bool) {
    // 持续监听 pod 的变化
    go wait.Until(func() {
        k.Run(podCfg.Updates())
    }, 0, wait.NeverStop)
    ...
}

golang 中可以通过 time.Ticker 实现定时任务的执行,但在 k8s 中用了更原生的方式,使用 time.Timer 实现的。time.Ticker 和 time.Timer 的使用区别如下:

  • ticker 只要定义完成,从此刻开始计时,不需要任何其他的操作,每隔固定时间都会自动触发。
  • timer 定时器是到了固定时间后会执行一次,仅执行一次
  • 如果 timer 定时器要每隔间隔的时间执行,实现 ticker 的效果,使用 func (t *Timer) Reset(d Duration) bool

示例1 试验原生time.Ticker 以及time.Timer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
func main(){
	var wg sync.WaitGroup
	ch := make(chan struct{})
	timer1 := time.NewTimer(2 * time.Second)
	ticker1 := time.NewTicker(2 * time.Second)

	go func() {
		fmt.Println("sleep 10s")
		fmt.Println("sleep 时间",time.Now().Format("2006-01-02 15:04:05"))
		time.Sleep(10 * time.Second)
		close(ch)
	}()

	wg.Add(1)
	go func(t *time.Ticker) {
		defer wg.Done()
		for {
			select {
			case _,isClose := <- ch:
				if !isClose{
					return
				}
			default:
			}
			<- t.C
			fmt.Println("exec ticker",time.Now().Format("2006-01-02 15:04:05"))
		}
	}(ticker1)

	wg.Add(1)
	go func(t *time.Timer) {
		defer wg.Done()
		for  {
			select {
			case _,isClose := <- ch:
				if !isClose{
					return
				}
			default:
			}

			<- t.C
			fmt.Println("exec timer",time.Now().Format("2006-01-02 15:04:05"))
			t.Reset(2 * time.Second) //重置计时器,只有重置了,timer类型才能重新在t.C取到值,否则这个循环只会执行一次,这里不会阻塞,所以循环会多输出一次
		}
	}(timer1)

	wg.Wait()
}


输出结果
sleep 10s
sleep 时间 2021-08-19 11:08:42
exec ticker 2021-08-19 11:08:44
exec timer 2021-08-19 11:08:44
exec timer 2021-08-19 11:08:46
exec ticker 2021-08-19 11:08:46
exec ticker 2021-08-19 11:08:48
exec timer 2021-08-19 11:08:48
exec ticker 2021-08-19 11:08:50
exec timer 2021-08-19 11:08:50
exec ticker 2021-08-19 11:08:52
exec timer 2021-08-19 11:08:52


Wait包介绍

wait 包提供了通过轮询或者监听一个条件的修改(关闭channel, ctx.Done,…)来执行指定函数的工具函数. 这些函数可以分为四大类

  • Until 类: 根据 channel 的关闭或者 context Done 的信号来结束对指定函数的轮询操作
  • Poll 类:不只是会根据 channel 或者 context 来决定结束轮询,还会判断轮询函数的返回值来决定是否结束
  • Wait 类: 会根据 WaitFor 函数返回的 channel 来触发函数执行
  • Backoff 类:会根据 Backoff 返回的时间间隔来循环触发函数的执行

在介绍具体函数前,介绍一下用于对轮询的时间间隔进行抖动干扰的函数 Jitter

1
2
3
4
5
6
func Jitter(duration time.Duration, maxFactor float64) time.Duration {
	if maxFactor <= 0.0 {
		maxFactor = 1.0
	}
	return duration + time.Duration(rand.Float64()*maxFactor*float64(duration))
}

Jitter 会返回一个 [duration, (duration+maxFactor*duration) ) 区间的时间间隔

Untill类

监听channel或者context

until 类 的函数参数都具有相同的意义

  • f 表示具体执行的函数
  • period 表示轮询的时间间隔
  • jitterFactor 表示对轮询时间间隔加一个抖动 Jitter(period, jitterFactor)
  • sliding 为 True 表示f() 运行之后计算周期,False 表示 period 包含 f() 的执行时间
  • stopCh 表示当 stopCh 被关闭则结束轮询
  • ctx 表示当 ctx.Done 是结束函数的轮询
函数 作用
func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{})
func JitterUntilWithContext(ctx context.Context, f func(context.Context), period time.Duration, jitterFactor float64, sliding bool)
func Until(f func(), period time.Duration, stopCh <-chan struct{}) 相当于 sliding 为 True ,无抖动的 JitterUntil
func NoSlidingUntil(f func(), period time.Duration, stopCh <-chan struct{}) sliding 为 False
func UntilWithContext(ctx context.Context, f func(context.Context), period time.Duration)
func NoSlidingUntilWithContext(ctx context.Context, f func(context.Context), period time.Duration)
func Forever(f func(), period time.Duration) 永久轮询执行 f,相当于 Until(f, period, NeverStop)

根据函数名称可以一些规律:

Jitter* 带时间抖动参数 jitterFactor NoSliding* period 包含 f() 的执行时间 Until* 默认从f() 运行之后计算周期 执行开始计算下一次的执行时间间隔

核心代码

核心代码(k8s.io/apimachinery/pkg/util/wait/wait.go):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) {
    var t *time.Timer
    var sawTimeout bool

    for {
        select {
        case <-stopCh:
            return
        default:
        }

        jitteredPeriod := period
        if jitterFactor > 0.0 {
            jitteredPeriod = Jitter(period, jitterFactor)
        }

        if !sliding {
            t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout)
        }

        func() {
            defer runtime.HandleCrash()
            f()
        }()

        if sliding {
            t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout)
        }

        select {
        case <-stopCh:
            return
        case <-t.C:
            sawTimeout = true
        }
    }
}

...

func resetOrReuseTimer(t *time.Timer, d time.Duration, sawTimeout bool) *time.Timer {
    if t == nil {
        return time.NewTimer(d)
    }
    if !t.Stop() && !sawTimeout {
        <-t.C
    }
    t.Reset(d)
    return t
}

几个关键点的说明:

  • 1、如果 sliding 为 true,则在 f() 运行之后计算周期。如果为 false,那么 period 包含 f() 的执行时间。
  • 2、在 golang 中 select 没有优先级选择,为了避免额外执行 f(),在每次循环开始后会先判断 stopCh chan。

k8s 中 wait 包其实是对 time.Timer 做了一层封装实现。

常用的方法

1、定期执行一个函数,永不停止,可以使用 Forever 方法:

func Forever(f func(), period time.Duration)

2、在需要的时候停止循环,那么可以使用下面的方法,增加一个用于停止的 chan 即可,方法定义如下:

1
2
3
func Until(f func(), period time.Duration, stopCh <-chan struct{}) {
	JitterUntil(f, period, 0.0, true, stopCh)
}

上面的第三个参数 stopCh 就是用于退出无限循环的标志,停止的时候我们 close 掉这个 chan 就可以了。

3、 有时候我们需要周期的计时器在f函数启动之前开始计时,知道stopCh接收到信号或者被关闭,跟until唯一的区别就是sliding参数传入值的是false

1
2
3
func NonSlidingUntil(f func(), period time.Duration, stopCh <-chan struct{}) {
	JitterUntil(f, period, 0.0, false, stopCh)
}

示例1 使用kubernetes源码中wait包的until函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
下载wait库
C:\Users\xx\Desktop\git\waittest>go get k8s.io/apimachinery/pkg/util/wait

//示例2 使用kubernetes源码中wait包的until函数
func main()  {
	ch := make(chan struct{})
	go func() {
		log.Println("sleep 1s")
		time.Sleep(1 * time.Second)
		close(ch)
	}()
	wait.Until(func() {
		time.Sleep(100 * time.Millisecond)
		log.Println("test")
	},100 * time.Millisecond,ch)

	log.Println("main exit")
}

输出结果:
C:\Users\xx\Desktop\git\waittest>go run main.go
2021/08/19 10:03:34 sleep 1s
2021/08/19 10:03:34 test
2021/08/19 10:03:34 test
2021/08/19 10:03:34 test
2021/08/19 10:03:34 test
2021/08/19 10:03:35 test
2021/08/19 10:03:35 main exit

示例2

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
type Pod struct {
	ID int
	Name string
}
func main(){

	podKillCh := make(chan *Pod,50)


	go func() {
		i := 0
		for  {
			time.Sleep(2 * time.Second)
			podKillCh <- &Pod{
				ID: i,
				Name: strconv.Itoa(i),
			}
			i++
		}
	}()

	wait.Until(func() {
		for pod := range podKillCh{
			log.Printf("%#v\n",pod)
		}
	},1 ,wait.NeverStop) //wait.NeverStop 表示永不停止 ,与下面代码效果一致
	
	//wait.Forever(func() {
	//	for pod := range podKillCh{
	//		log.Printf("%#v\n",pod)
	//	}
	//},1 )	
	

	log.Println("main exit")
}

输出:
C:\Users\xx\Desktop\git\waittest>go run main.go
2021/08/19 10:24:04 &main.Pod{ID:0, Name:"0"}
2021/08/19 10:24:06 &main.Pod{ID:1, Name:"1"}
2021/08/19 10:24:08 &main.Pod{ID:2, Name:"2"}
2021/08/19 10:24:10 &main.Pod{ID:3, Name:"3"}
2021/08/19 10:24:12 &main.Pod{ID:4, Name:"4"}
2021/08/19 10:24:14 &main.Pod{ID:5, Name:"5"}

在func执行期间(遍历podKillCh,空channel会阻塞),不会重新计时再次执行func。

Poll 类型

会根据 ConditionFunc 函数来决定是否退出循环

1
type ConditionFunc func()(done bool, err error)

当调用 ConditionFunc 返回 true 或者 error 时,结束对 ConditionFunc 的轮询,并且如果 ConditionFunc 返回了 error 那么Poll* 也会返回该错误

Poll 类型的轮询函数同样具有一些相同意义的参数:

  • interval 为执行 ConditionFunc 的时间间隔
  • timeout 表示 Poll 类型函数的执行超时时间,如果在 timeout 时间内 ConditionFunc 都没有返回 true 或者 error 时,返回 ErrWaitTimeout
  • 内部是使用的time.Ticker对象
  • 最终调用的都是核心函数poll,在执行ConditionFunc 之前执行还是在之后执行,看poll参数immediate是true还是false,如果是true,那么就先执行,然后再等待interval
函数 作用
func Poll(interval, timeout time.Duration, condition ConditionFunc) error 在执行 ConditionFunc 之前会等待 interval 的时间
func PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error 监听 stopChPoll
func PollImmediate(interval, timeount time.Duration, condition ConditionFunc) error 执行 ConditionFunc 之后会等待 interval 的时间
func PollImmediateInfinite(interval time.Duration, condition ConditionFunc) error 没有过期时间的 PollImmediate
func PollImmediateUntil(interval time.Duration, condition ConditionFunc, stopCh <- chan struct{}) error 监听 stopCh 是否关闭 的 PollImmediate

Poll 和 PollImmediate 两种函数的区别是: PollImmediate 会先执行 ConditionFunc ,然后在等待 interval 的时间,而 Poll 会先等待 interval

核心代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
func poll(ctx context.Context, immediate bool, wait WaitWithContextFunc, condition ConditionWithContextFunc) error {
	if immediate {
		done, err := runConditionWithCrashProtectionWithContext(ctx, condition)
		if err != nil {
			return err
		}
		if done {
			return nil
		}
	}

	select {
	case <-ctx.Done():
		// returning ctx.Err() will break backward compatibility
		return ErrWaitTimeout
	default:
		return WaitForWithContext(ctx, wait, condition)
	}
}

常用方法

1、有时候,我们还会需要在运行前去检查先决条件,在条件满足的时候才去运行某一任务,这时候可以使用 Poll 方法:

func Poll(interval, timeout time.Duration, condition ConditionFunc)

这个函数会以 interval 为间隔,不断去检查 condition 条件是否为真,如果为真则可以继续后续处理;如果指定了 timeout 参数,则该函数也可以只常识指定的时间。

2、PollUntil 方法和上面的类似,但是没有 timeout 参数,多了一个 stopCh 参数,如下所示:

PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error

此外还有 PollImmediate 、 PollInfinite 和 PollImmediateInfinite 方法。

Wait类

会根据WaitFunc来决定是否执行ConditionFunc

1
2
3
4
5
6
7
8
9
// WaitFunc creates a channel that receives an item every time a test
// should be executed and is closed when the last test should be invoked.
type WaitFunc func(done <-chan struct{}) <-chan struct{}

func WaitFor(wait WaitFunc, fn ConditionFunc, done <-chan struct{}) error {
	ctx, cancel := contextForChannel(done)
	defer cancel()
	return WaitForWithContext(ctx, wait.WithContext(), fn.WithContext())
}

WaitForWaitFunc 中获取一个 channel,当 channel 可读是执行 fn,然后检查 channel 是否被关闭,如果关闭则退出函数

即使 channel 的关闭操作也会触发一次 fn 执行

done 关闭时,直接返回 ErrorWaitTimeout,不过这个时候需要注意: 由于 select 是随机选择的,即使 done 被关闭了,fn 在 channel 可读的情况下依然可能会执行一次或者 n 次

在 WaitFunc 中关闭返回的 channel 会效果更好

核心代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func WaitForWithContext(ctx context.Context, wait WaitWithContextFunc, fn ConditionWithContextFunc) error {
	waitCtx, cancel := context.WithCancel(context.Background())
	defer cancel()
	c := wait(waitCtx)
	for {
		select {
		case _, open := <-c:
			ok, err := runConditionWithCrashProtectionWithContext(ctx, fn)
			if err != nil {
				return err
			}
			if ok {
				return nil
			}
			if !open {
				return ErrWaitTimeout
			}
		case <-ctx.Done():
			// returning ctx.Err() will break backward compatibility
			return ErrWaitTimeout
		}
	}
}

poll函数与WaitForWithContext

Poll 函数实际底层便是调用 WaitForWithContext,具体看poll类型核心代码部分

Backoff类

根据Backoff来计算函数执行的时间间隔

Backoff 类型的函数会根据 Backoff 或者 BackoffManager 返回的时间间隔来决定函数的执行间隔

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
type Backoff struct {
	// 时间间隔,用于调用 Step 方法时返回的时间间隔.
	
	Duration time.Duration
	// 用于计算下次的时间间隔
	// Factor 大于 0 时,Backoff 在计算下次的时间间隔时都会根据 Duration * Factor,Factor * Duration 不能大于 Cap
	// 不能为负数
	Factor float64
	
	// Jitter > 0 时,每次迭代的时间间隔都会额外加上 0 - Duration * Jitter 的随机时间,并且抖动出的时间不会设置为 Duration,而且不受 Caps 的限制
	Jitter float64
	
	// 进行指数回退(*Factor) 操作的次数,当 Factor * Duration > Cap 时 Steps 会被设置为 0, Duration 设置为 Cap
	// 也就是说后续的迭代时间间隔都会返回 Duration
	Steps int

	// 最大的时间间隔
	Cap time.Duration
}

BackoffStep 方法返回通过 Factor, Cap, Jitter 计算的时间间隔

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (b *Backoff) Step() time.Duration {
	//steps < 1 不再进行指数回退操作
	if b.Steps < 1 {
		if b.Jitter > 0 {
			return Jitter(b.Duration, b.Jitter)
		}
		return b.Duration
	}
	b.Steps--

	duration := b.Duration

	// 如果Factor 等于 0,那就不需要修改Duration
	if b.Factor != 0 {
		b.Duration = time.Duration(float64(b.Duration) * b.Factor)
		if b.Cap > 0 && b.Duration > b.Cap {
			b.Duration = b.Cap
			b.Steps = 0
		}
	}

	if b.Jitter > 0 {
		duration = Jitter(duration, b.Jitter)
	}
	return duration
}

BackoffManger 定义了 Backoff 方法,返回一个定时器,用于触发函数的执行

1
2
3
4
type BackoffManager interface {
	Backoff() clock.Timer
}
func NewExponentialBackoffManager(initBackoff, maxBackoff, resetDuration time.Duration, backoffFactor, jitter float64, c clock.Clock) BackoffManager

NewExponentialBackoffManager会使用 initBackoff, maxBackoff, backoffFactor, jitter 来初始化一个 Backoff 对象 它的 Backoff 方法时间返回了一个使用 Backoff.Step 作为时间间隔的定时器 不过两次 Backoff.Step 的时间间隔不能大于 resetDuration 否则会重置 Backoff

1
2
3
4
5
6
7
8
func (b *expontialBackoffManagerImpl) getNextBackoff() time.Duration {
	if b.clock.Now().Sub(b.lastBackoffStart) > b.backResetDuration {
		b.backoff.Steps = math.MaxInt32
		b.backoff.Duration = b.initialBackoff
	}
	b.lastBackoffStart = b.clock.Now()
	return b.backoff.Step()
}

也就是说如果 backoff.Step() 返回的值大于 resetDuration,那么下次返回的时间间隔便是 initBackoff 通常用于上游状态异常用来较少负载

另外一种 BackoffManagerJitterBackoffManager,返回的是根据 jitter 抖动的时间间隔

1
2
3
4
5
6
7
8
9
func NewJitteredBackoffManager(duration time.Duration, jitter float64, c clock.Clock) BackoffManager

func (j *jitteredBackoffManagerImpl) getNextBackoff() time.Duration {
	if j.jitter > 0.0{
		//  计算出来的抖动的时间间隔并不会影响 j.duration 
		return Jitter(j.duration, j.jitter)
	}
	return j.duration
}

Backoff类轮询函数

函数 作用
func BackUntil(f func(), backoff BackoffManager, sliding book, stopCh <-chan struct{}) 根据 Backoff 的定时器来循环触发 f,直到 stopCh 关闭
func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error 根据 backoff 的时间间隔来循环执行 f
func ExponentialBackoffWithContext(ctx context.Context, backoff Backoff, condition ConditionFunc) error

ExponentialBackoff* 函数中 condition 会执行 backoff.Steps 次,到达次数并且 condition 没有返回 True 或 error,那么就返回 ErrWaitTimeout 如果 backoff.Steps 为 0 会直接返回 ErrWaitTimeout

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error {
	for backoff.Steps > 0 {
		if ok ,err := condition(); err != nil || ok {
			return err
		}
		if backoff.Steps == 1 {
			break
		}
		time.Sleep(backoff.Step())
	}
	return ErrWaitTimeout
}