sample-controller/vendor/k8s.io/client-go/util/flowcontrol/throttle.go
Kubernetes Publisher 7df7717190 Merge pull request #38320 from liggitt/golang-ratelimit
Automatic merge from submit-queue (batch tested with PRs 59158, 38320, 59059, 55516, 59357). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Switch from juju/ratelimit to golang.org/x/time/rate

Replaces juju/ratelimit with golang.org/x/time/rate
xref https://github.com/kubernetes/steering/issues/21

Requires removing the Saturation() method on the rate limiter. In the process of attempting to contribute it to the `golang.org/x/time/rate` implementation, it became clear that what it was calculating was not very useful when combined with periodic polling. See discussion in https://go-review.googlesource.com/c/time/+/29958#message-4caffc11669cadd90e2da4c05122cfec50ea6a22

```release-note
NONE
```

Kubernetes-commit: 0656d030a7d131ca8088a9f0ecd12596eb90d2fd
2018-02-05 21:38:00 +00:00

144 lines
3.4 KiB
Go

/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flowcontrol
import (
"sync"
"time"
"golang.org/x/time/rate"
)
type RateLimiter interface {
// TryAccept returns true if a token is taken immediately. Otherwise,
// it returns false.
TryAccept() bool
// Accept returns once a token becomes available.
Accept()
// Stop stops the rate limiter, subsequent calls to CanAccept will return false
Stop()
// QPS returns QPS of this rate limiter
QPS() float32
}
type tokenBucketRateLimiter struct {
limiter *rate.Limiter
clock Clock
qps float32
}
// NewTokenBucketRateLimiter creates a rate limiter which implements a token bucket approach.
// The rate limiter allows bursts of up to 'burst' to exceed the QPS, while still maintaining a
// smoothed qps rate of 'qps'.
// The bucket is initially filled with 'burst' tokens, and refills at a rate of 'qps'.
// The maximum number of tokens in the bucket is capped at 'burst'.
func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter {
limiter := rate.NewLimiter(rate.Limit(qps), burst)
return newTokenBucketRateLimiter(limiter, realClock{}, qps)
}
// An injectable, mockable clock interface.
type Clock interface {
Now() time.Time
Sleep(time.Duration)
}
type realClock struct{}
func (realClock) Now() time.Time {
return time.Now()
}
func (realClock) Sleep(d time.Duration) {
time.Sleep(d)
}
// NewTokenBucketRateLimiterWithClock is identical to NewTokenBucketRateLimiter
// but allows an injectable clock, for testing.
func NewTokenBucketRateLimiterWithClock(qps float32, burst int, c Clock) RateLimiter {
limiter := rate.NewLimiter(rate.Limit(qps), burst)
return newTokenBucketRateLimiter(limiter, c, qps)
}
func newTokenBucketRateLimiter(limiter *rate.Limiter, c Clock, qps float32) RateLimiter {
return &tokenBucketRateLimiter{
limiter: limiter,
clock: c,
qps: qps,
}
}
func (t *tokenBucketRateLimiter) TryAccept() bool {
return t.limiter.AllowN(t.clock.Now(), 1)
}
// Accept will block until a token becomes available
func (t *tokenBucketRateLimiter) Accept() {
now := t.clock.Now()
t.clock.Sleep(t.limiter.ReserveN(now, 1).DelayFrom(now))
}
func (t *tokenBucketRateLimiter) Stop() {
}
func (t *tokenBucketRateLimiter) QPS() float32 {
return t.qps
}
type fakeAlwaysRateLimiter struct{}
func NewFakeAlwaysRateLimiter() RateLimiter {
return &fakeAlwaysRateLimiter{}
}
func (t *fakeAlwaysRateLimiter) TryAccept() bool {
return true
}
func (t *fakeAlwaysRateLimiter) Stop() {}
func (t *fakeAlwaysRateLimiter) Accept() {}
func (t *fakeAlwaysRateLimiter) QPS() float32 {
return 1
}
type fakeNeverRateLimiter struct {
wg sync.WaitGroup
}
func NewFakeNeverRateLimiter() RateLimiter {
rl := fakeNeverRateLimiter{}
rl.wg.Add(1)
return &rl
}
func (t *fakeNeverRateLimiter) TryAccept() bool {
return false
}
func (t *fakeNeverRateLimiter) Stop() {
t.wg.Done()
}
func (t *fakeNeverRateLimiter) Accept() {
t.wg.Wait()
}
func (t *fakeNeverRateLimiter) QPS() float32 {
return 1
}