Merge pull request #38320 from liggitt/golang-ratelimit

Automatic merge from submit-queue (batch tested with PRs 59158, 38320, 59059, 55516, 59357). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Switch from juju/ratelimit to golang.org/x/time/rate

Replaces juju/ratelimit with golang.org/x/time/rate
xref https://github.com/kubernetes/steering/issues/21

Requires removing the Saturation() method on the rate limiter. In the process of attempting to contribute it to the `golang.org/x/time/rate` implementation, it became clear that what it was calculating was not very useful when combined with periodic polling. See discussion in https://go-review.googlesource.com/c/time/+/29958#message-4caffc11669cadd90e2da4c05122cfec50ea6a22

```release-note
NONE
```

Kubernetes-commit: 0656d030a7d131ca8088a9f0ecd12596eb90d2fd
This commit is contained in:
Kubernetes Publisher
2018-02-05 12:40:34 -08:00
15 changed files with 581 additions and 803 deletions
+25 -30
View File
@@ -18,8 +18,9 @@ package flowcontrol
import (
"sync"
"time"
"github.com/juju/ratelimit"
"golang.org/x/time/rate"
)
type RateLimiter interface {
@@ -30,17 +31,13 @@ type RateLimiter interface {
Accept()
// Stop stops the rate limiter, subsequent calls to CanAccept will return false
Stop()
// Saturation returns a percentage number which describes how saturated
// this rate limiter is.
// Usually we use token bucket rate limiter. In that case,
// 1.0 means no tokens are available; 0.0 means we have a full bucket of tokens to use.
Saturation() float64
// QPS returns QPS of this rate limiter
QPS() float32
}
type tokenBucketRateLimiter struct {
limiter *ratelimit.Bucket
limiter *rate.Limiter
clock Clock
qps float32
}
@@ -50,42 +47,48 @@ type tokenBucketRateLimiter struct {
// The bucket is initially filled with 'burst' tokens, and refills at a rate of 'qps'.
// The maximum number of tokens in the bucket is capped at 'burst'.
func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter {
limiter := ratelimit.NewBucketWithRate(float64(qps), int64(burst))
return newTokenBucketRateLimiter(limiter, qps)
limiter := rate.NewLimiter(rate.Limit(qps), burst)
return newTokenBucketRateLimiter(limiter, realClock{}, qps)
}
// An injectable, mockable clock interface.
type Clock interface {
ratelimit.Clock
Now() time.Time
Sleep(time.Duration)
}
type realClock struct{}
func (realClock) Now() time.Time {
return time.Now()
}
func (realClock) Sleep(d time.Duration) {
time.Sleep(d)
}
// NewTokenBucketRateLimiterWithClock is identical to NewTokenBucketRateLimiter
// but allows an injectable clock, for testing.
func NewTokenBucketRateLimiterWithClock(qps float32, burst int, clock Clock) RateLimiter {
limiter := ratelimit.NewBucketWithRateAndClock(float64(qps), int64(burst), clock)
return newTokenBucketRateLimiter(limiter, qps)
func NewTokenBucketRateLimiterWithClock(qps float32, burst int, c Clock) RateLimiter {
limiter := rate.NewLimiter(rate.Limit(qps), burst)
return newTokenBucketRateLimiter(limiter, c, qps)
}
func newTokenBucketRateLimiter(limiter *ratelimit.Bucket, qps float32) RateLimiter {
func newTokenBucketRateLimiter(limiter *rate.Limiter, c Clock, qps float32) RateLimiter {
return &tokenBucketRateLimiter{
limiter: limiter,
clock: c,
qps: qps,
}
}
func (t *tokenBucketRateLimiter) TryAccept() bool {
return t.limiter.TakeAvailable(1) == 1
}
func (t *tokenBucketRateLimiter) Saturation() float64 {
capacity := t.limiter.Capacity()
avail := t.limiter.Available()
return float64(capacity-avail) / float64(capacity)
return t.limiter.AllowN(t.clock.Now(), 1)
}
// Accept will block until a token becomes available
func (t *tokenBucketRateLimiter) Accept() {
t.limiter.Wait(1)
now := t.clock.Now()
t.clock.Sleep(t.limiter.ReserveN(now, 1).DelayFrom(now))
}
func (t *tokenBucketRateLimiter) Stop() {
@@ -105,10 +108,6 @@ func (t *fakeAlwaysRateLimiter) TryAccept() bool {
return true
}
func (t *fakeAlwaysRateLimiter) Saturation() float64 {
return 0
}
func (t *fakeAlwaysRateLimiter) Stop() {}
func (t *fakeAlwaysRateLimiter) Accept() {}
@@ -131,10 +130,6 @@ func (t *fakeNeverRateLimiter) TryAccept() bool {
return false
}
func (t *fakeNeverRateLimiter) Saturation() float64 {
return 1
}
func (t *fakeNeverRateLimiter) Stop() {
t.wg.Done()
}