K8s源码3-Kubeproxy以及令牌桶

早期版本

简单看一下kubeproxy是如何进行轮训调度的

// 这个LoadBalancerRR是一个单例模式,全局只有一个实例。
type LoadBalancerRR struct {
	lock         sync.RWMutex
  // endpointsMap存储的是servcie和对应的endpoints[]
	endpointsMap map[string][]string
  // rrIndex是service和该service当前的轮训Index索引
	rrIndex      map[string]int
}

func NewLoadBalancerRR() *LoadBalancerRR {
	return &LoadBalancerRR{endpointsMap: make(map[string][]string), rrIndex: make(map[string]int)}
}
func (impl LoadBalancerRR) LoadBalance(service string, srcAddr net.Addr) (string, error) {
	impl.lock.RLock()
  // 先从endpointMap中拿到此servive,得到一个[]string,里面是对应的endpoint地址
	endpoints, exists := impl.endpointsMap[service]
  // rrIndex中是service和该service当前的轮训调度index的映射
	index := impl.rrIndex[service]
	impl.lock.RUnlock()
	if exists == false {
		return "", errors.New("no service entry for:" + service)
	}
	if len(endpoints) == 0 {
		return "", errors.New("no endpoints for: " + service)
	}
  // 拿到当前service中,index索引对应的endpoint
	endpoint := endpoints[index]
  
  // index索引++
	impl.rrIndex[service] = (index + 1) % len(endpoints)
	return endpoint, nil
}

接下来看proxy


type Proxier struct {
	loadBalancer LoadBalancer
	serviceMap   map[string]int
}

func (proxier Proxier) AcceptHandler(service string, listener net.Listener) {
	for {
		inConn, err := listener.Accept()
		if err != nil {
			log.Printf("Accept failed: %v", err)
			continue
		}
		log.Printf("Accepted connection from: %v to %v", inConn.RemoteAddr(), inConn.LocalAddr())

		// Figure out where this request should go.
		endpoint, err := proxier.loadBalancer.LoadBalance(service, inConn.RemoteAddr())
		if err != nil {
			log.Printf("Couldn't find an endpoint for %s %v", service, err)
			inConn.Close()
			continue
		}

		log.Printf("Mapped service %s to endpoint %s", service, endpoint)
		outConn, err := net.DialTimeout("tcp", endpoint, time.Duration(5)*time.Second)
		// We basically need to take everything from inConn and send to outConn
		// and anything coming from outConn needs to be sent to inConn.
		if err != nil {
			log.Printf("Dial failed: %v", err)
			inConn.Close()
			continue
		}
		go ProxyConnection(inConn.(*net.TCPConn), outConn.(*net.TCPConn))
	}
}

这里逻辑也是比较简单,每一个service都有一个AcceptHandler,监听某个端口。如果有新的连接进来,就调用loadBalancer.loadBalance()选择一个endpoint,之后启动一个ProxyConnection协程进行TCP转发。

K8s 1.26

由于代码量非常多,所以此处只贴出函数调用栈和部分代码。

image-20230118155037120

kube-proxy大体流程是通过Run()方法启动一个proxyServer。ProxyServer初始化时会拥有一个Proxier,Proxier有iptables,ipvs等模式,这里只介绍iptables。proxier进行一系列初始化操作后,最终会调用syncRunner.Loop()开始循环处理。syncRunner类型是BoundedFrequencyRunner。这里主要看一下BoundedFrequencyRunner的代码实现。

// NewBoundedFrequencyRunner("name", fn, 3*time.Second, 10*time.Second, 3)
// - fn will have at least 3 seconds between runs, with up to 3 burst runs
// - fn will hav e no more than 10 seconds between runs
// 这里fn传进来的是proxier.syncProxyRules,刷新iptables规则,不同模式的proxier有不同的syncProxyRules,此暂时不介绍。
func NewBoundedFrequencyRunner(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int) *BoundedFrequencyRunner {
  // TODO 这里占个坑,后续研究了timer源码后再来看这里为什么要先消费掉第一个tick
	timer := &realTimer{timer: time.NewTimer(0)} // will tick immediately
	<-timer.C()                                  // consume the first tick
	return construct(name, fn, minInterval, maxInterval, burstRuns, timer)
}

// Make an instance with dependencies injected.
func construct(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int, timer timer) *BoundedFrequencyRunner {
	
  ....
	bfr := &BoundedFrequencyRunner{
		name:        name,
		fn:          fn,
		minInterval: minInterval,
		maxInterval: maxInterval,
		run:         make(chan struct{}, 1),
		retry:       make(chan struct{}, 1),
		timer:       timer,
	}
	if minInterval == 0 {
		bfr.limiter = nullLimiter{}
	} else {
		// 采用“令牌桶”算法实现流控机制
		qps := float32(time.Second) / float32(minInterval)
		bfr.limiter = flowcontrol.NewTokenBucketRateLimiterWithClock(qps, burstRuns, timer)
	}
	return bfr
}

// 循环
func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) {
	klog.V(3).Infof("%s Loop running", bfr.name)
	bfr.timer.Reset(bfr.maxInterval)
	for {
		select {
		case <-stop:
			bfr.stop()
			klog.V(3).Infof("%s Loop stopping", bfr.name)
			return
    // 每间隔一定时间,执行一次
		case <-bfr.timer.C():
			bfr.tryRun()
    // 突发执行
		case <-bfr.run:
			bfr.tryRun()
		case <-bfr.retry:
			bfr.doRetry()
		}
	}
}

Proxier所有的更新,添加等方法最终都落到了bfr.run或retry这两个chan,来突发/抢占执行相应的syncProxyRules。

image-20230118161819307

proxier.sync()调用了proxier.syncRunner.Run(),即向bfr.run中发送一个空结构体struct{} {}

// Run the function as soon as possible.  If this is called while Loop is not
// running, the call may be deferred indefinitely.
// If there is already a queued request to call the underlying function, it
// may be dropped - it is just guaranteed that we will try calling the
// underlying function as soon as possible starting from now.
func (bfr *BoundedFrequencyRunner) Run() {
	// If it takes a lot of time to run the underlying function, noone is really
	// processing elements from <run> channel. So to avoid blocking here on the
	// putting element to it, we simply skip it if there is already an element
	// in it.
  // 翻译:满则丢弃,避免阻塞,尽最大可能Run
  // 具体这么做的原因是因为,BoundedFrequencyRunner中对run进行了限流,来防止无限制刷新iptables造成服务器宕机
	select {
	case bfr.run <- struct{}{}:
	default:
	}
}

接下来介绍在BoundedFrequencyRunner的Loop中,当有空结构体到来时,如何tryRun()

// assumes the lock is not held
func (bfr *BoundedFrequencyRunner) tryRun() {
	bfr.mu.Lock()
	defer bfr.mu.Unlock()

  // 如果可以立即执行(拿到令牌)
  // 此处拿令牌不可以使用Accept方法阻塞拿取,否则会阻塞Loop循环,影响正常业务逻辑。
	if bfr.limiter.TryAccept() {
		// We're allowed to run the function right now.
		bfr.fn()
		bfr.lastRun = bfr.timer.Now()
		bfr.timer.Stop()
    // 重设最大时间,开始下一轮最大时长计时
		bfr.timer.Reset(bfr.maxInterval)
		klog.V(3).Infof("%s: ran, next possible in %v, periodic in %v", bfr.name, bfr.minInterval, bfr.maxInterval)
		return
	}
  
	// 如果不能立即执行,则计算下一次执行时间
	elapsed := bfr.timer.Since(bfr.lastRun)   // 从上一次执行到现在过了多久
	nextPossible := bfr.minInterval - elapsed // 至少还需要多少时间才能被调度
	nextScheduled := bfr.timer.Remaining()    // 至多需要多少时间就会被调度
	klog.V(4).Infof("%s: %v since last run, possible in %v, scheduled in %v", bfr.name, elapsed, nextPossible, nextScheduled)

	// It's hard to avoid race conditions in the unit tests unless we always reset
	// the timer here, even when it's unchanged
	if nextPossible < nextScheduled { // 判断大小,选择最近的时间。
		nextScheduled = nextPossible
	}
	bfr.timer.Stop()
	bfr.timer.Reset(nextScheduled)
}

那么BoundedFrequencyRunner是如何计算是否可以立即执行呢,那么就要回到上文中的这两条代码。

	// 构造bfr时,采用“令牌桶”算法实现流控机制得到limiter
	qps := float32(time.Second) / float32(minInterval)
	bfr.limiter = flowcontrol.NewTokenBucketRateLimiterWithClock(qps, burstRuns, timer)

接下来看令牌桶算法的实现。

令牌桶

func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter {
	limiter := rate.NewLimiter(rate.Limit(qps), burst)
	return newTokenBucketRateLimiterWithClock(limiter, clock.RealClock{}, qps)
}
....
func newTokenBucketRateLimiterWithClock(limiter *rate.Limiter, c Clock, qps float32) *tokenBucketRateLimiter {
	return &tokenBucketRateLimiter{
		tokenBucketPassiveRateLimiter: *newTokenBucketRateLimiterWithPassiveClock(limiter, c, qps),
		clock:                         c,
	}
}
....
// 最终返回的是tokenBucketRateLimiter
type tokenBucketRateLimiter struct {
	tokenBucketPassiveRateLimiter
	clock Clock
}
....
// tokenBucketPassiveRateLimiter 继承了 tokenBucketRateLimiter
type tokenBucketPassiveRateLimiter struct {
	limiter *rate.Limiter
	qps     float32
	clock   clock.PassiveClock
}
....
// BoundedFrequencyRunner的tryRun中,申请令牌桶,调用了TryAccept
func (tbprl *tokenBucketPassiveRateLimiter) TryAccept() bool {
	return tbprl.limiter.AllowN(tbprl.clock.Now(), 1)
}
....
// AllowN reports whether n events may happen at time now.
// Use this method if you intend to drop / skip events that exceed the rate limit.
// Otherwise use Reserve or Wait.
// 意思是超出限制就丢掉
func (lim *Limiter) AllowN(now time.Time, n int) bool {
	return lim.reserveN(now, n, 0).ok
}

由此得出,最终令牌桶的实现方法在reserveN内

func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
	lim.mu.Lock()
	defer lim.mu.Unlock()

  // 如果对于QPS没有限制,则直接返回。
	if lim.limit == Inf {
		return Reservation{
			ok:        true,
			lim:       lim,
			tokens:    n,
			timeToAct: now,
		}
	} else if lim.limit == 0 {
		var ok bool
		if lim.burst >= n {
			ok = true
			lim.burst -= n
		}
		return Reservation{
			ok:        ok,
			lim:       lim,
			tokens:    lim.burst,
			timeToAct: now,
		}
	}

  // 计算过去的这段时间中,生成了多少tokens,满了则丢掉
	now, last, tokens := lim.advance(now)

	// Calculate the remaining number of tokens resulting from the request.
  // 取出tokens
	tokens -= float64(n)

	// Calculate the wait duration
  // 如果tokens不够取,则等待生成
	var waitDuration time.Duration
	if tokens < 0 {
		waitDuration = lim.limit.durationFromTokens(-tokens)
	}
  
  // 如果要取的token数量<=桶的最大数量,并且tokens不够时,<=最大等待时间(此处是0)
  // 则定义一个flag,表示取出成功
	// Decide result
	ok := n <= lim.burst && waitDuration <= maxFutureReserve

	// 准备一个预定结果
	r := Reservation{
		ok:    ok,
		lim:   lim,
		limit: lim.limit,
	}
  // 预定成功,则附加一些预定信息
	if ok {
		r.tokens = n
		r.timeToAct = now.Add(waitDuration)
	}

	// 更新Limiter中last,tokens等信息
	if ok {
		lim.last = now
		lim.tokens = tokens
		lim.lastEvent = r.timeToAct
	} else {
		lim.last = last
	}

	return r
}

这里补充一下advance()和tokensFromDuration(),durationFromTokens()是如何实现的。

advance用来计算从上一次取令牌后到现在,桶里应该有多少令牌。

// advance calculates and returns an updated state for lim resulting from the passage of time.
// lim is not changed.
// advance requires that lim.mu is held.
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
  // 拿到上一次取令牌的时间
	last := lim.last
	if now.Before(last) {
		last = now
	}

	// 计算过去了多久
	elapsed := now.Sub(last)
  // 计算过去的时间中应该生成多少个令牌
	delta := lim.limit.tokensFromDuration(elapsed)
  // 将令牌添加到桶中
	tokens := lim.tokens + delta
  // 桶满则丢弃
	if burst := float64(lim.burst); tokens > burst {
		tokens = burst
	}
	return now, last, tokens
}

tokensFromDuration()根据时间计算令牌数量,durationFromTokens根据令牌数量计算时间


// tokensFromDuration is a unit conversion function from a time duration to the number of tokens
// which could be accumulated during that duration at a rate of limit tokens per second.
func (limit Limit) tokensFromDuration(d time.Duration) float64 {
	if limit <= 0 {
		return 0
	}
  // 过去的时间*QPS(每秒生成token的数量)
	return d.Seconds() * float64(limit)
}

// durationFromTokens is a unit conversion function from the number of tokens to the duration
// of time it takes to accumulate them at a rate of limit tokens per second.
func (limit Limit) durationFromTokens(tokens float64) time.Duration {
	if limit <= 0 {
		return InfDuration
	}
  // token的数量 / QPS(每秒生成token的数量) = 需要等待的时间
	seconds := tokens / float64(limit)
	return time.Duration(float64(time.Second) * seconds)
}

简单总结一下,每次取token的时候,都会先计算出从上一次取token到现在新生成了多少token,最大不超过桶的数量,然后尝试取token。

如果token不够,则计算生成缺少的token数量的时间,和maxFutureReserve做比较,>则说明取出失败。此处maxFutureReserve为0,即缺少token就判定取出失败。否则取出成功。接下来就根据取出结果返回信息。

k8s这里使用的令牌桶中Limter来自 golang.org/x/time/rate。取令牌有两个方法,Accept()和TryAccept(),区别只在于前者是阻塞式的,取不到令牌,则等待令牌生成,因此这里也写了两个类。

type PassiveRateLimiter interface {
	// TryAccept returns true if a token is taken immediately. Otherwise,
	// it returns false.
	TryAccept() bool
	// Stop stops the rate limiter, subsequent calls to CanAccept will return false
	Stop()
	// QPS returns QPS of this rate limiter
	QPS() float32
}
// 这里实现了设计模式中的代理模式
type tokenBucketRateLimiter struct {
	tokenBucketPassiveRateLimiter
	clock Clock
}
....
type tokenBucketPassiveRateLimiter struct {
	limiter *rate.Limiter
	qps     float32
	clock   clock.PassiveClock
}

阻塞式Accept()方法来自tokenBucketRateLimiter,实现如下

func (tbrl *tokenBucketRateLimiter) Accept() {
	now := tbrl.clock.Now()
  // 主动等待令牌生成的时间
	tbrl.clock.Sleep(tbrl.limiter.ReserveN(now, 1).DelayFrom(now))
}

func (r *Reservation) DelayFrom(now time.Time) time.Duration {
	if !r.ok {
		return InfDuration
	}
  // timeToAct = now.Add(waitDuration),即当需要取出的tokens不够,且等待的时间<maxFutureReserve时,取出成功但需要等待生成tokens的时间。
	delay := r.timeToAct.Sub(now)
	if delay < 0 {
		return 0
	}
	return delay
}

回到Limiter,有三种取出的token的方式:AllowN(刚才所讲的),ReserveN和 WaitN,类似于代理模式,对reserveN(now time.Time, n int, maxFutureReserve time.Duration) 进行了一层包装。

ReserveN中最大等待时间maxFutureReserve为无限大,所以取出数量<=桶大小时,就会取出成功。

WaitN同ReserveN,等待时间无限大。当成功取出后,就开始阻塞等待。此处WaitN方法也需要传入一个ctx来进行超时计时。

	t := time.NewTimer(delay)
	defer t.Stop()
	select {
	case <-t.C:
		// We can proceed.
		return nil
	case <-ctx.Done():
		// Context was canceled before we could proceed.  Cancel the
		// reservation, which may permit other events to proceed sooner.
		r.Cancel()
		return ctx.Err()
	}

WaitN()和Accept()方法同样实现了阻塞等待,为什么还需要额外增加一个Accpet()方法呢,岂不是多此一举,过读设计。笔者认为其中一个可能的原因是k8s令牌桶不需要额外的计时逻辑。·

这个系列的帖子