K8s源码3-Kubeproxy以及令牌桶
早期版本
简单看一下kubeproxy是如何进行轮训调度的
// 这个LoadBalancerRR是一个单例模式,全局只有一个实例。
type LoadBalancerRR struct {
lock sync.RWMutex
// endpointsMap存储的是servcie和对应的endpoints[]
endpointsMap map[string][]string
// rrIndex是service和该service当前的轮训Index索引
rrIndex map[string]int
}
func NewLoadBalancerRR() *LoadBalancerRR {
return &LoadBalancerRR{endpointsMap: make(map[string][]string), rrIndex: make(map[string]int)}
}
func (impl LoadBalancerRR) LoadBalance(service string, srcAddr net.Addr) (string, error) {
impl.lock.RLock()
// 先从endpointMap中拿到此servive,得到一个[]string,里面是对应的endpoint地址
endpoints, exists := impl.endpointsMap[service]
// rrIndex中是service和该service当前的轮训调度index的映射
index := impl.rrIndex[service]
impl.lock.RUnlock()
if exists == false {
return "", errors.New("no service entry for:" + service)
}
if len(endpoints) == 0 {
return "", errors.New("no endpoints for: " + service)
}
// 拿到当前service中,index索引对应的endpoint
endpoint := endpoints[index]
// index索引++
impl.rrIndex[service] = (index + 1) % len(endpoints)
return endpoint, nil
}
接下来看proxy
type Proxier struct {
loadBalancer LoadBalancer
serviceMap map[string]int
}
func (proxier Proxier) AcceptHandler(service string, listener net.Listener) {
for {
inConn, err := listener.Accept()
if err != nil {
log.Printf("Accept failed: %v", err)
continue
}
log.Printf("Accepted connection from: %v to %v", inConn.RemoteAddr(), inConn.LocalAddr())
// Figure out where this request should go.
endpoint, err := proxier.loadBalancer.LoadBalance(service, inConn.RemoteAddr())
if err != nil {
log.Printf("Couldn't find an endpoint for %s %v", service, err)
inConn.Close()
continue
}
log.Printf("Mapped service %s to endpoint %s", service, endpoint)
outConn, err := net.DialTimeout("tcp", endpoint, time.Duration(5)*time.Second)
// We basically need to take everything from inConn and send to outConn
// and anything coming from outConn needs to be sent to inConn.
if err != nil {
log.Printf("Dial failed: %v", err)
inConn.Close()
continue
}
go ProxyConnection(inConn.(*net.TCPConn), outConn.(*net.TCPConn))
}
}
这里逻辑也是比较简单,每一个service都有一个AcceptHandler,监听某个端口。如果有新的连接进来,就调用loadBalancer.loadBalance()选择一个endpoint,之后启动一个ProxyConnection协程进行TCP转发。
K8s 1.26
由于代码量非常多,所以此处只贴出函数调用栈和部分代码。
kube-proxy大体流程是通过Run()方法启动一个proxyServer。ProxyServer初始化时会拥有一个Proxier,Proxier有iptables,ipvs等模式,这里只介绍iptables。proxier进行一系列初始化操作后,最终会调用syncRunner.Loop()开始循环处理。syncRunner类型是BoundedFrequencyRunner。这里主要看一下BoundedFrequencyRunner的代码实现。
// NewBoundedFrequencyRunner("name", fn, 3*time.Second, 10*time.Second, 3)
// - fn will have at least 3 seconds between runs, with up to 3 burst runs
// - fn will hav e no more than 10 seconds between runs
// 这里fn传进来的是proxier.syncProxyRules,刷新iptables规则,不同模式的proxier有不同的syncProxyRules,此暂时不介绍。
func NewBoundedFrequencyRunner(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int) *BoundedFrequencyRunner {
// TODO 这里占个坑,后续研究了timer源码后再来看这里为什么要先消费掉第一个tick
timer := &realTimer{timer: time.NewTimer(0)} // will tick immediately
<-timer.C() // consume the first tick
return construct(name, fn, minInterval, maxInterval, burstRuns, timer)
}
// Make an instance with dependencies injected.
func construct(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int, timer timer) *BoundedFrequencyRunner {
....
bfr := &BoundedFrequencyRunner{
name: name,
fn: fn,
minInterval: minInterval,
maxInterval: maxInterval,
run: make(chan struct{}, 1),
retry: make(chan struct{}, 1),
timer: timer,
}
if minInterval == 0 {
bfr.limiter = nullLimiter{}
} else {
// 采用“令牌桶”算法实现流控机制
qps := float32(time.Second) / float32(minInterval)
bfr.limiter = flowcontrol.NewTokenBucketRateLimiterWithClock(qps, burstRuns, timer)
}
return bfr
}
// 循环
func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) {
klog.V(3).Infof("%s Loop running", bfr.name)
bfr.timer.Reset(bfr.maxInterval)
for {
select {
case <-stop:
bfr.stop()
klog.V(3).Infof("%s Loop stopping", bfr.name)
return
// 每间隔一定时间,执行一次
case <-bfr.timer.C():
bfr.tryRun()
// 突发执行
case <-bfr.run:
bfr.tryRun()
case <-bfr.retry:
bfr.doRetry()
}
}
}
Proxier所有的更新,添加等方法最终都落到了bfr.run或retry这两个chan,来突发/抢占执行相应的syncProxyRules。
proxier.sync()调用了proxier.syncRunner.Run(),即向bfr.run中发送一个空结构体struct{} {}
// Run the function as soon as possible. If this is called while Loop is not
// running, the call may be deferred indefinitely.
// If there is already a queued request to call the underlying function, it
// may be dropped - it is just guaranteed that we will try calling the
// underlying function as soon as possible starting from now.
func (bfr *BoundedFrequencyRunner) Run() {
// If it takes a lot of time to run the underlying function, noone is really
// processing elements from <run> channel. So to avoid blocking here on the
// putting element to it, we simply skip it if there is already an element
// in it.
// 翻译:满则丢弃,避免阻塞,尽最大可能Run
// 具体这么做的原因是因为,BoundedFrequencyRunner中对run进行了限流,来防止无限制刷新iptables造成服务器宕机
select {
case bfr.run <- struct{}{}:
default:
}
}
接下来介绍在BoundedFrequencyRunner的Loop中,当有空结构体到来时,如何tryRun()
// assumes the lock is not held
func (bfr *BoundedFrequencyRunner) tryRun() {
bfr.mu.Lock()
defer bfr.mu.Unlock()
// 如果可以立即执行(拿到令牌)
// 此处拿令牌不可以使用Accept方法阻塞拿取,否则会阻塞Loop循环,影响正常业务逻辑。
if bfr.limiter.TryAccept() {
// We're allowed to run the function right now.
bfr.fn()
bfr.lastRun = bfr.timer.Now()
bfr.timer.Stop()
// 重设最大时间,开始下一轮最大时长计时
bfr.timer.Reset(bfr.maxInterval)
klog.V(3).Infof("%s: ran, next possible in %v, periodic in %v", bfr.name, bfr.minInterval, bfr.maxInterval)
return
}
// 如果不能立即执行,则计算下一次执行时间
elapsed := bfr.timer.Since(bfr.lastRun) // 从上一次执行到现在过了多久
nextPossible := bfr.minInterval - elapsed // 至少还需要多少时间才能被调度
nextScheduled := bfr.timer.Remaining() // 至多需要多少时间就会被调度
klog.V(4).Infof("%s: %v since last run, possible in %v, scheduled in %v", bfr.name, elapsed, nextPossible, nextScheduled)
// It's hard to avoid race conditions in the unit tests unless we always reset
// the timer here, even when it's unchanged
if nextPossible < nextScheduled { // 判断大小,选择最近的时间。
nextScheduled = nextPossible
}
bfr.timer.Stop()
bfr.timer.Reset(nextScheduled)
}
那么BoundedFrequencyRunner是如何计算是否可以立即执行呢,那么就要回到上文中的这两条代码。
// 构造bfr时,采用“令牌桶”算法实现流控机制得到limiter
qps := float32(time.Second) / float32(minInterval)
bfr.limiter = flowcontrol.NewTokenBucketRateLimiterWithClock(qps, burstRuns, timer)
接下来看令牌桶算法的实现。
令牌桶
func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter {
limiter := rate.NewLimiter(rate.Limit(qps), burst)
return newTokenBucketRateLimiterWithClock(limiter, clock.RealClock{}, qps)
}
....
func newTokenBucketRateLimiterWithClock(limiter *rate.Limiter, c Clock, qps float32) *tokenBucketRateLimiter {
return &tokenBucketRateLimiter{
tokenBucketPassiveRateLimiter: *newTokenBucketRateLimiterWithPassiveClock(limiter, c, qps),
clock: c,
}
}
....
// 最终返回的是tokenBucketRateLimiter
type tokenBucketRateLimiter struct {
tokenBucketPassiveRateLimiter
clock Clock
}
....
// tokenBucketPassiveRateLimiter 继承了 tokenBucketRateLimiter
type tokenBucketPassiveRateLimiter struct {
limiter *rate.Limiter
qps float32
clock clock.PassiveClock
}
....
// BoundedFrequencyRunner的tryRun中,申请令牌桶,调用了TryAccept
func (tbprl *tokenBucketPassiveRateLimiter) TryAccept() bool {
return tbprl.limiter.AllowN(tbprl.clock.Now(), 1)
}
....
// AllowN reports whether n events may happen at time now.
// Use this method if you intend to drop / skip events that exceed the rate limit.
// Otherwise use Reserve or Wait.
// 意思是超出限制就丢掉
func (lim *Limiter) AllowN(now time.Time, n int) bool {
return lim.reserveN(now, n, 0).ok
}
由此得出,最终令牌桶的实现方法在reserveN内
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
lim.mu.Lock()
defer lim.mu.Unlock()
// 如果对于QPS没有限制,则直接返回。
if lim.limit == Inf {
return Reservation{
ok: true,
lim: lim,
tokens: n,
timeToAct: now,
}
} else if lim.limit == 0 {
var ok bool
if lim.burst >= n {
ok = true
lim.burst -= n
}
return Reservation{
ok: ok,
lim: lim,
tokens: lim.burst,
timeToAct: now,
}
}
// 计算过去的这段时间中,生成了多少tokens,满了则丢掉
now, last, tokens := lim.advance(now)
// Calculate the remaining number of tokens resulting from the request.
// 取出tokens
tokens -= float64(n)
// Calculate the wait duration
// 如果tokens不够取,则等待生成
var waitDuration time.Duration
if tokens < 0 {
waitDuration = lim.limit.durationFromTokens(-tokens)
}
// 如果要取的token数量<=桶的最大数量,并且tokens不够时,<=最大等待时间(此处是0)
// 则定义一个flag,表示取出成功
// Decide result
ok := n <= lim.burst && waitDuration <= maxFutureReserve
// 准备一个预定结果
r := Reservation{
ok: ok,
lim: lim,
limit: lim.limit,
}
// 预定成功,则附加一些预定信息
if ok {
r.tokens = n
r.timeToAct = now.Add(waitDuration)
}
// 更新Limiter中last,tokens等信息
if ok {
lim.last = now
lim.tokens = tokens
lim.lastEvent = r.timeToAct
} else {
lim.last = last
}
return r
}
这里补充一下advance()和tokensFromDuration(),durationFromTokens()是如何实现的。
advance用来计算从上一次取令牌后到现在,桶里应该有多少令牌。
// advance calculates and returns an updated state for lim resulting from the passage of time.
// lim is not changed.
// advance requires that lim.mu is held.
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
// 拿到上一次取令牌的时间
last := lim.last
if now.Before(last) {
last = now
}
// 计算过去了多久
elapsed := now.Sub(last)
// 计算过去的时间中应该生成多少个令牌
delta := lim.limit.tokensFromDuration(elapsed)
// 将令牌添加到桶中
tokens := lim.tokens + delta
// 桶满则丢弃
if burst := float64(lim.burst); tokens > burst {
tokens = burst
}
return now, last, tokens
}
tokensFromDuration()根据时间计算令牌数量,durationFromTokens根据令牌数量计算时间
// tokensFromDuration is a unit conversion function from a time duration to the number of tokens
// which could be accumulated during that duration at a rate of limit tokens per second.
func (limit Limit) tokensFromDuration(d time.Duration) float64 {
if limit <= 0 {
return 0
}
// 过去的时间*QPS(每秒生成token的数量)
return d.Seconds() * float64(limit)
}
// durationFromTokens is a unit conversion function from the number of tokens to the duration
// of time it takes to accumulate them at a rate of limit tokens per second.
func (limit Limit) durationFromTokens(tokens float64) time.Duration {
if limit <= 0 {
return InfDuration
}
// token的数量 / QPS(每秒生成token的数量) = 需要等待的时间
seconds := tokens / float64(limit)
return time.Duration(float64(time.Second) * seconds)
}
简单总结一下,每次取token的时候,都会先计算出从上一次取token到现在新生成了多少token,最大不超过桶的数量,然后尝试取token。
如果token不够,则计算生成缺少的token数量的时间,和maxFutureReserve做比较,>则说明取出失败。此处maxFutureReserve为0,即缺少token就判定取出失败。否则取出成功。接下来就根据取出结果返回信息。
k8s这里使用的令牌桶中Limter来自 golang.org/x/time/rate。取令牌有两个方法,Accept()和TryAccept(),区别只在于前者是阻塞式的,取不到令牌,则等待令牌生成,因此这里也写了两个类。
type PassiveRateLimiter interface {
// TryAccept returns true if a token is taken immediately. Otherwise,
// it returns false.
TryAccept() bool
// Stop stops the rate limiter, subsequent calls to CanAccept will return false
Stop()
// QPS returns QPS of this rate limiter
QPS() float32
}
// 这里实现了设计模式中的代理模式
type tokenBucketRateLimiter struct {
tokenBucketPassiveRateLimiter
clock Clock
}
....
type tokenBucketPassiveRateLimiter struct {
limiter *rate.Limiter
qps float32
clock clock.PassiveClock
}
阻塞式Accept()方法来自tokenBucketRateLimiter,实现如下
func (tbrl *tokenBucketRateLimiter) Accept() {
now := tbrl.clock.Now()
// 主动等待令牌生成的时间
tbrl.clock.Sleep(tbrl.limiter.ReserveN(now, 1).DelayFrom(now))
}
func (r *Reservation) DelayFrom(now time.Time) time.Duration {
if !r.ok {
return InfDuration
}
// timeToAct = now.Add(waitDuration),即当需要取出的tokens不够,且等待的时间<maxFutureReserve时,取出成功但需要等待生成tokens的时间。
delay := r.timeToAct.Sub(now)
if delay < 0 {
return 0
}
return delay
}
回到Limiter,有三种取出的token的方式:AllowN(刚才所讲的),ReserveN和 WaitN,类似于代理模式,对reserveN(now time.Time, n int, maxFutureReserve time.Duration) 进行了一层包装。
ReserveN中最大等待时间maxFutureReserve为无限大,所以取出数量<=桶大小时,就会取出成功。
WaitN同ReserveN,等待时间无限大。当成功取出后,就开始阻塞等待。此处WaitN方法也需要传入一个ctx来进行超时计时。
t := time.NewTimer(delay)
defer t.Stop()
select {
case <-t.C:
// We can proceed.
return nil
case <-ctx.Done():
// Context was canceled before we could proceed. Cancel the
// reservation, which may permit other events to proceed sooner.
r.Cancel()
return ctx.Err()
}
WaitN()和Accept()方法同样实现了阻塞等待,为什么还需要额外增加一个Accpet()方法呢,岂不是多此一举,过读设计。笔者认为其中一个可能的原因是k8s令牌桶不需要额外的计时逻辑。·