


// 这个LoadBalancerRR是一个单例模式,全局只有一个实例。
type LoadBalancerRR struct {
	lock         sync.RWMutex
  // endpointsMap存储的是servcie和对应的endpoints[]
	endpointsMap map[string][]string
  // rrIndex是service和该service当前的轮训Index索引
	rrIndex      map[string]int

func NewLoadBalancerRR() *LoadBalancerRR {
	return &LoadBalancerRR{endpointsMap: make(map[string][]string), rrIndex: make(map[string]int)}
func (impl LoadBalancerRR) LoadBalance(service string, srcAddr net.Addr) (string, error) {
  // 先从endpointMap中拿到此servive,得到一个[]string,里面是对应的endpoint地址
	endpoints, exists := impl.endpointsMap[service]
  // rrIndex中是service和该service当前的轮训调度index的映射
	index := impl.rrIndex[service]
	if exists == false {
		return "", errors.New("no service entry for:" + service)
	if len(endpoints) == 0 {
		return "", errors.New("no endpoints for: " + service)
  // 拿到当前service中,index索引对应的endpoint
	endpoint := endpoints[index]
  // index索引++
	impl.rrIndex[service] = (index + 1) % len(endpoints)
	return endpoint, nil


type Proxier struct {
	loadBalancer LoadBalancer
	serviceMap   map[string]int

func (proxier Proxier) AcceptHandler(service string, listener net.Listener) {
	for {
		inConn, err := listener.Accept()
		if err != nil {
			log.Printf("Accept failed: %v", err)
		log.Printf("Accepted connection from: %v to %v", inConn.RemoteAddr(), inConn.LocalAddr())

		// Figure out where this request should go.
		endpoint, err := proxier.loadBalancer.LoadBalance(service, inConn.RemoteAddr())
		if err != nil {
			log.Printf("Couldn't find an endpoint for %s %v", service, err)

		log.Printf("Mapped service %s to endpoint %s", service, endpoint)
		outConn, err := net.DialTimeout("tcp", endpoint, time.Duration(5)*time.Second)
		// We basically need to take everything from inConn and send to outConn
		// and anything coming from outConn needs to be sent to inConn.
		if err != nil {
			log.Printf("Dial failed: %v", err)
		go ProxyConnection(inConn.(*net.TCPConn), outConn.(*net.TCPConn))


K8s 1.26




// NewBoundedFrequencyRunner("name", fn, 3*time.Second, 10*time.Second, 3)
// - fn will have at least 3 seconds between runs, with up to 3 burst runs
// - fn will hav e no more than 10 seconds between runs
// 这里fn传进来的是proxier.syncProxyRules,刷新iptables规则,不同模式的proxier有不同的syncProxyRules,此暂时不介绍。
func NewBoundedFrequencyRunner(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int) *BoundedFrequencyRunner {
  // TODO 这里占个坑,后续研究了timer源码后再来看这里为什么要先消费掉第一个tick
	timer := &realTimer{timer: time.NewTimer(0)} // will tick immediately
	<-timer.C()                                  // consume the first tick
	return construct(name, fn, minInterval, maxInterval, burstRuns, timer)

// Make an instance with dependencies injected.
func construct(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int, timer timer) *BoundedFrequencyRunner {
	bfr := &BoundedFrequencyRunner{
		name:        name,
		fn:          fn,
		minInterval: minInterval,
		maxInterval: maxInterval,
		run:         make(chan struct{}, 1),
		retry:       make(chan struct{}, 1),
		timer:       timer,
	if minInterval == 0 {
		bfr.limiter = nullLimiter{}
	} else {
		// 采用“令牌桶”算法实现流控机制
		qps := float32(time.Second) / float32(minInterval)
		bfr.limiter = flowcontrol.NewTokenBucketRateLimiterWithClock(qps, burstRuns, timer)
	return bfr

// 循环
func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) {
	klog.V(3).Infof("%s Loop running", bfr.name)
	for {
		select {
		case <-stop:
			klog.V(3).Infof("%s Loop stopping", bfr.name)
    // 每间隔一定时间,执行一次
		case <-bfr.timer.C():
    // 突发执行
		case <-bfr.run:
		case <-bfr.retry:



proxier.sync()调用了proxier.syncRunner.Run(),即向bfr.run中发送一个空结构体struct{} {}

// Run the function as soon as possible.  If this is called while Loop is not
// running, the call may be deferred indefinitely.
// If there is already a queued request to call the underlying function, it
// may be dropped - it is just guaranteed that we will try calling the
// underlying function as soon as possible starting from now.
func (bfr *BoundedFrequencyRunner) Run() {
	// If it takes a lot of time to run the underlying function, noone is really
	// processing elements from <run> channel. So to avoid blocking here on the
	// putting element to it, we simply skip it if there is already an element
	// in it.
  // 翻译:满则丢弃,避免阻塞,尽最大可能Run
  // 具体这么做的原因是因为,BoundedFrequencyRunner中对run进行了限流,来防止无限制刷新iptables造成服务器宕机
	select {
	case bfr.run <- struct{}{}:


// assumes the lock is not held
func (bfr *BoundedFrequencyRunner) tryRun() {
	defer bfr.mu.Unlock()

  // 如果可以立即执行(拿到令牌)
  // 此处拿令牌不可以使用Accept方法阻塞拿取,否则会阻塞Loop循环,影响正常业务逻辑。
	if bfr.limiter.TryAccept() {
		// We're allowed to run the function right now.
		bfr.lastRun = bfr.timer.Now()
    // 重设最大时间,开始下一轮最大时长计时
		klog.V(3).Infof("%s: ran, next possible in %v, periodic in %v", bfr.name, bfr.minInterval, bfr.maxInterval)
	// 如果不能立即执行,则计算下一次执行时间
	elapsed := bfr.timer.Since(bfr.lastRun)   // 从上一次执行到现在过了多久
	nextPossible := bfr.minInterval - elapsed // 至少还需要多少时间才能被调度
	nextScheduled := bfr.timer.Remaining()    // 至多需要多少时间就会被调度
	klog.V(4).Infof("%s: %v since last run, possible in %v, scheduled in %v", bfr.name, elapsed, nextPossible, nextScheduled)

	// It's hard to avoid race conditions in the unit tests unless we always reset
	// the timer here, even when it's unchanged
	if nextPossible < nextScheduled { // 判断大小,选择最近的时间。
		nextScheduled = nextPossible


	// 构造bfr时,采用“令牌桶”算法实现流控机制得到limiter
	qps := float32(time.Second) / float32(minInterval)
	bfr.limiter = flowcontrol.NewTokenBucketRateLimiterWithClock(qps, burstRuns, timer)



func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter {
	limiter := rate.NewLimiter(rate.Limit(qps), burst)
	return newTokenBucketRateLimiterWithClock(limiter, clock.RealClock{}, qps)
func newTokenBucketRateLimiterWithClock(limiter *rate.Limiter, c Clock, qps float32) *tokenBucketRateLimiter {
	return &tokenBucketRateLimiter{
		tokenBucketPassiveRateLimiter: *newTokenBucketRateLimiterWithPassiveClock(limiter, c, qps),
		clock:                         c,
// 最终返回的是tokenBucketRateLimiter
type tokenBucketRateLimiter struct {
	clock Clock
// tokenBucketPassiveRateLimiter 继承了 tokenBucketRateLimiter
type tokenBucketPassiveRateLimiter struct {
	limiter *rate.Limiter
	qps     float32
	clock   clock.PassiveClock
// BoundedFrequencyRunner的tryRun中,申请令牌桶,调用了TryAccept
func (tbprl *tokenBucketPassiveRateLimiter) TryAccept() bool {
	return tbprl.limiter.AllowN(tbprl.clock.Now(), 1)
// AllowN reports whether n events may happen at time now.
// Use this method if you intend to drop / skip events that exceed the rate limit.
// Otherwise use Reserve or Wait.
// 意思是超出限制就丢掉
func (lim *Limiter) AllowN(now time.Time, n int) bool {
	return lim.reserveN(now, n, 0).ok


func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
	defer lim.mu.Unlock()

  // 如果对于QPS没有限制,则直接返回。
	if lim.limit == Inf {
		return Reservation{
			ok:        true,
			lim:       lim,
			tokens:    n,
			timeToAct: now,
	} else if lim.limit == 0 {
		var ok bool
		if lim.burst >= n {
			ok = true
			lim.burst -= n
		return Reservation{
			ok:        ok,
			lim:       lim,
			tokens:    lim.burst,
			timeToAct: now,

  // 计算过去的这段时间中,生成了多少tokens,满了则丢掉
	now, last, tokens := lim.advance(now)

	// Calculate the remaining number of tokens resulting from the request.
  // 取出tokens
	tokens -= float64(n)

	// Calculate the wait duration
  // 如果tokens不够取,则等待生成
	var waitDuration time.Duration
	if tokens < 0 {
		waitDuration = lim.limit.durationFromTokens(-tokens)
  // 如果要取的token数量<=桶的最大数量,并且tokens不够时,<=最大等待时间(此处是0)
  // 则定义一个flag,表示取出成功
	// Decide result
	ok := n <= lim.burst && waitDuration <= maxFutureReserve

	// 准备一个预定结果
	r := Reservation{
		ok:    ok,
		lim:   lim,
		limit: lim.limit,
  // 预定成功,则附加一些预定信息
	if ok {
		r.tokens = n
		r.timeToAct = now.Add(waitDuration)

	// 更新Limiter中last,tokens等信息
	if ok {
		lim.last = now
		lim.tokens = tokens
		lim.lastEvent = r.timeToAct
	} else {
		lim.last = last

	return r



// advance calculates and returns an updated state for lim resulting from the passage of time.
// lim is not changed.
// advance requires that lim.mu is held.
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
  // 拿到上一次取令牌的时间
	last := lim.last
	if now.Before(last) {
		last = now

	// 计算过去了多久
	elapsed := now.Sub(last)
  // 计算过去的时间中应该生成多少个令牌
	delta := lim.limit.tokensFromDuration(elapsed)
  // 将令牌添加到桶中
	tokens := lim.tokens + delta
  // 桶满则丢弃
	if burst := float64(lim.burst); tokens > burst {
		tokens = burst
	return now, last, tokens


// tokensFromDuration is a unit conversion function from a time duration to the number of tokens
// which could be accumulated during that duration at a rate of limit tokens per second.
func (limit Limit) tokensFromDuration(d time.Duration) float64 {
	if limit <= 0 {
		return 0
  // 过去的时间*QPS(每秒生成token的数量)
	return d.Seconds() * float64(limit)

// durationFromTokens is a unit conversion function from the number of tokens to the duration
// of time it takes to accumulate them at a rate of limit tokens per second.
func (limit Limit) durationFromTokens(tokens float64) time.Duration {
	if limit <= 0 {
		return InfDuration
  // token的数量 / QPS(每秒生成token的数量) = 需要等待的时间
	seconds := tokens / float64(limit)
	return time.Duration(float64(time.Second) * seconds)



k8s这里使用的令牌桶中Limter来自 golang.org/x/time/rate。取令牌有两个方法,Accept()和TryAccept(),区别只在于前者是阻塞式的,取不到令牌,则等待令牌生成,因此这里也写了两个类。

type PassiveRateLimiter interface {
	// TryAccept returns true if a token is taken immediately. Otherwise,
	// it returns false.
	TryAccept() bool
	// Stop stops the rate limiter, subsequent calls to CanAccept will return false
	// QPS returns QPS of this rate limiter
	QPS() float32
// 这里实现了设计模式中的代理模式
type tokenBucketRateLimiter struct {
	clock Clock
type tokenBucketPassiveRateLimiter struct {
	limiter *rate.Limiter
	qps     float32
	clock   clock.PassiveClock


func (tbrl *tokenBucketRateLimiter) Accept() {
	now := tbrl.clock.Now()
  // 主动等待令牌生成的时间
	tbrl.clock.Sleep(tbrl.limiter.ReserveN(now, 1).DelayFrom(now))

func (r *Reservation) DelayFrom(now time.Time) time.Duration {
	if !r.ok {
		return InfDuration
  // timeToAct = now.Add(waitDuration),即当需要取出的tokens不够,且等待的时间<maxFutureReserve时,取出成功但需要等待生成tokens的时间。
	delay := r.timeToAct.Sub(now)
	if delay < 0 {
		return 0
	return delay

回到Limiter,有三种取出的token的方式:AllowN(刚才所讲的),ReserveN和 WaitN,类似于代理模式,对reserveN(now time.Time, n int, maxFutureReserve time.Duration) 进行了一层包装。



	t := time.NewTimer(delay)
	defer t.Stop()
	select {
	case <-t.C:
		// We can proceed.
		return nil
	case <-ctx.Done():
		// Context was canceled before we could proceed.  Cancel the
		// reservation, which may permit other events to proceed sooner.
		return ctx.Err()

