Merge pull request #72138 from dims/switch-location-for-goautoneg

Switch location for goautoneg vendored code

Kubernetes-commit: ef2a5b948b0d6c422873a823755ee7d12284dcc3
This commit is contained in:
Kubernetes Publisher 2019-01-18 14:42:46 -08:00
commit 59dd5ff32c
21 changed files with 606 additions and 301 deletions

434
Godeps/Godeps.json generated

File diff suppressed because it is too large Load Diff

View File

@ -351,29 +351,39 @@ type WaitFunc func(done <-chan struct{}) <-chan struct{}
// WaitFor continually checks 'fn' as driven by 'wait'.
//
// WaitFor gets a channel from 'wait()'', and then invokes 'fn' once for every value
// placed on the channel and once more when the channel is closed.
// placed on the channel and once more when the channel is closed. If the channel is closed
// and 'fn' returns false without error, WaitFor returns ErrWaitTimeout.
//
// If 'fn' returns an error the loop ends and that error is returned, and if
// If 'fn' returns an error the loop ends and that error is returned. If
// 'fn' returns true the loop ends and nil is returned.
//
// ErrWaitTimeout will be returned if the channel is closed without fn ever
// ErrWaitTimeout will be returned if the 'done' channel is closed without fn ever
// returning true.
//
// When the done channel is closed, because the golang `select` statement is
// "uniform pseudo-random", the `fn` might still run one or multiple time,
// though eventually `WaitFor` will return.
func WaitFor(wait WaitFunc, fn ConditionFunc, done <-chan struct{}) error {
c := wait(done)
stopCh := make(chan struct{})
defer close(stopCh)
c := wait(stopCh)
for {
_, open := <-c
ok, err := fn()
if err != nil {
return err
}
if ok {
return nil
}
if !open {
break
select {
case _, open := <-c:
ok, err := fn()
if err != nil {
return err
}
if ok {
return nil
}
if !open {
return ErrWaitTimeout
}
case <-done:
return ErrWaitTimeout
}
}
return ErrWaitTimeout
}
// poller returns a WaitFunc that will send to the channel every interval until

View File

@ -25,7 +25,7 @@ import (
"sync"
"time"
"github.com/googleapis/gnostic/OpenAPIv2"
openapi_v2 "github.com/googleapis/gnostic/OpenAPIv2"
"k8s.io/klog"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -266,13 +266,10 @@ func NewCachedDiscoveryClientForConfig(config *restclient.Config, discoveryCache
if len(httpCacheDir) > 0 {
// update the given restconfig with a custom roundtripper that
// understands how to handle cache responses.
wt := config.WrapTransport
config.WrapTransport = func(rt http.RoundTripper) http.RoundTripper {
if wt != nil {
rt = wt(rt)
}
config = restclient.CopyConfig(config)
config.Wrap(func(rt http.RoundTripper) http.RoundTripper {
return newCacheRoundTripper(httpCacheDir, rt)
}
})
}
discoveryClient, err := NewDiscoveryClientForConfig(config)

View File

@ -60,6 +60,7 @@ func (c *FakePods) GetLogs(name string, opts *v1.PodLogOptions) *restclient.Requ
func (c *FakePods) Evict(eviction *policy.Eviction) error {
action := core.CreateActionImpl{}
action.Verb = "create"
action.Namespace = c.ns
action.Resource = podsResource
action.Subresource = "eviction"
action.Object = eviction

View File

@ -23,12 +23,13 @@ import (
)
func (c *FakeEvictions) Evict(eviction *policy.Eviction) error {
action := core.GetActionImpl{}
action.Verb = "post"
action := core.CreateActionImpl{}
action.Verb = "create"
action.Namespace = c.ns
action.Resource = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}
action.Subresource = "eviction"
action.Name = eviction.Name
action.Object = eviction
_, err := c.Fake.Invokes(action, eviction)
return err
}

View File

@ -22,7 +22,7 @@ import (
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// ExecCredentials is used by exec-based plugins to communicate credentials to
// ExecCredential is used by exec-based plugins to communicate credentials to
// HTTP transports.
type ExecCredential struct {
metav1.TypeMeta `json:",inline"`

View File

@ -31,8 +31,9 @@ import (
"sync"
"time"
"github.com/davecgh/go-spew/spew"
"golang.org/x/crypto/ssh/terminal"
"k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
@ -73,8 +74,10 @@ func newCache() *cache {
return &cache{m: make(map[string]*Authenticator)}
}
var spewConfig = &spew.ConfigState{DisableMethods: true, Indent: " "}
func cacheKey(c *api.ExecConfig) string {
return fmt.Sprintf("%#v", c)
return spewConfig.Sprint(c)
}
type cache struct {
@ -172,13 +175,9 @@ type credentials struct {
// UpdateTransportConfig updates the transport.Config to use credentials
// returned by the plugin.
func (a *Authenticator) UpdateTransportConfig(c *transport.Config) error {
wt := c.WrapTransport
c.WrapTransport = func(rt http.RoundTripper) http.RoundTripper {
if wt != nil {
rt = wt(rt)
}
c.Wrap(func(rt http.RoundTripper) http.RoundTripper {
return &roundTripper{a, rt}
}
})
if c.TLS.GetCert != nil {
return errors.New("can't add TLS certificate callback: transport.Config.TLS.GetCert already set")

View File

@ -34,6 +34,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/pkg/version"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"k8s.io/client-go/transport"
certutil "k8s.io/client-go/util/cert"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/klog"
@ -95,13 +96,16 @@ type Config struct {
// Transport may be used for custom HTTP behavior. This attribute may not
// be specified with the TLS client certificate options. Use WrapTransport
// for most client level operations.
// to provide additional per-server middleware behavior.
Transport http.RoundTripper
// WrapTransport will be invoked for custom HTTP behavior after the underlying
// transport is initialized (either the transport created from TLSClientConfig,
// Transport, or http.DefaultTransport). The config may layer other RoundTrippers
// on top of the returned RoundTripper.
WrapTransport func(rt http.RoundTripper) http.RoundTripper
//
// A future release will change this field to an array. Use config.Wrap()
// instead of setting this value directly.
WrapTransport transport.WrapperFunc
// QPS indicates the maximum QPS to the master from this client.
// If it's zero, the created RESTClient will use DefaultQPS: 5
@ -125,6 +129,47 @@ type Config struct {
// Version string
}
var _ fmt.Stringer = new(Config)
var _ fmt.GoStringer = new(Config)
type sanitizedConfig *Config
type sanitizedAuthConfigPersister struct{ AuthProviderConfigPersister }
func (sanitizedAuthConfigPersister) GoString() string {
return "rest.AuthProviderConfigPersister(--- REDACTED ---)"
}
func (sanitizedAuthConfigPersister) String() string {
return "rest.AuthProviderConfigPersister(--- REDACTED ---)"
}
// GoString implements fmt.GoStringer and sanitizes sensitive fields of Config
// to prevent accidental leaking via logs.
func (c *Config) GoString() string {
return c.String()
}
// String implements fmt.Stringer and sanitizes sensitive fields of Config to
// prevent accidental leaking via logs.
func (c *Config) String() string {
if c == nil {
return "<nil>"
}
cc := sanitizedConfig(CopyConfig(c))
// Explicitly mark non-empty credential fields as redacted.
if cc.Password != "" {
cc.Password = "--- REDACTED ---"
}
if cc.BearerToken != "" {
cc.BearerToken = "--- REDACTED ---"
}
if cc.AuthConfigPersister != nil {
cc.AuthConfigPersister = sanitizedAuthConfigPersister{cc.AuthConfigPersister}
}
return fmt.Sprintf("%#v", cc)
}
// ImpersonationConfig has all the available impersonation options
type ImpersonationConfig struct {
// UserName is the username to impersonate on each request.
@ -164,6 +209,40 @@ type TLSClientConfig struct {
CAData []byte
}
var _ fmt.Stringer = TLSClientConfig{}
var _ fmt.GoStringer = TLSClientConfig{}
type sanitizedTLSClientConfig TLSClientConfig
// GoString implements fmt.GoStringer and sanitizes sensitive fields of
// TLSClientConfig to prevent accidental leaking via logs.
func (c TLSClientConfig) GoString() string {
return c.String()
}
// String implements fmt.Stringer and sanitizes sensitive fields of
// TLSClientConfig to prevent accidental leaking via logs.
func (c TLSClientConfig) String() string {
cc := sanitizedTLSClientConfig{
Insecure: c.Insecure,
ServerName: c.ServerName,
CertFile: c.CertFile,
KeyFile: c.KeyFile,
CAFile: c.CAFile,
CertData: c.CertData,
KeyData: c.KeyData,
CAData: c.CAData,
}
// Explicitly mark non-empty credential fields as redacted.
if len(cc.CertData) != 0 {
cc.CertData = []byte("--- TRUNCATED ---")
}
if len(cc.KeyData) != 0 {
cc.KeyData = []byte("--- REDACTED ---")
}
return fmt.Sprintf("%#v", cc)
}
type ContentConfig struct {
// AcceptContentTypes specifies the types the client will accept and is optional.
// If not set, ContentType will be used to define the Accept header

View File

@ -1100,7 +1100,8 @@ func (r Result) Into(obj runtime.Object) error {
return fmt.Errorf("serializer for %s doesn't exist", r.contentType)
}
if len(r.body) == 0 {
return fmt.Errorf("0-length response")
return fmt.Errorf("0-length response with status code: %d and content type: %s",
r.statusCode, r.contentType)
}
out, _, err := r.decoder.Decode(r.body, nil, obj)

View File

@ -103,14 +103,15 @@ func (c *Config) TransportConfig() (*transport.Config, error) {
if err != nil {
return nil, err
}
wt := conf.WrapTransport
if wt != nil {
conf.WrapTransport = func(rt http.RoundTripper) http.RoundTripper {
return provider.WrapTransport(wt(rt))
}
} else {
conf.WrapTransport = provider.WrapTransport
}
conf.Wrap(provider.WrapTransport)
}
return conf, nil
}
// Wrap adds a transport middleware function that will give the caller
// an opportunity to wrap the underlying http.RoundTripper prior to the
// first API call being made. The provided function is invoked after any
// existing transport wrappers are invoked.
func (c *Config) Wrap(fn transport.WrapperFunc) {
c.WrapTransport = transport.Wrappers(c.WrapTransport, fn)
}

View File

@ -131,8 +131,7 @@ func ObjectReaction(tracker ObjectTracker) ReactionFunc {
case PatchActionImpl:
obj, err := tracker.Get(gvr, ns, action.GetName())
if err != nil {
// object is not registered
return false, nil, err
return true, nil, err
}
old, err := json.Marshal(obj)

View File

@ -40,7 +40,7 @@ func (f *FakeCustomStore) Add(obj interface{}) error {
// Update calls the custom Update function if defined
func (f *FakeCustomStore) Update(obj interface{}) error {
if f.UpdateFunc != nil {
return f.Update(obj)
return f.UpdateFunc(obj)
}
return nil
}

View File

@ -31,7 +31,14 @@ import (
type AppendFunc func(interface{})
func ListAll(store Store, selector labels.Selector, appendFn AppendFunc) error {
selectAll := selector.Empty()
for _, m := range store.List() {
if selectAll {
// Avoid computing labels of the objects to speed up common flows
// of listing all objects.
appendFn(m)
continue
}
metadata, err := meta.Accessor(m)
if err != nil {
return err
@ -44,8 +51,15 @@ func ListAll(store Store, selector labels.Selector, appendFn AppendFunc) error {
}
func ListAllByNamespace(indexer Indexer, namespace string, selector labels.Selector, appendFn AppendFunc) error {
selectAll := selector.Empty()
if namespace == metav1.NamespaceAll {
for _, m := range indexer.List() {
if selectAll {
// Avoid computing labels of the objects to speed up common flows
// of listing all objects.
appendFn(m)
continue
}
metadata, err := meta.Accessor(m)
if err != nil {
return err
@ -74,6 +88,12 @@ func ListAllByNamespace(indexer Indexer, namespace string, selector labels.Selec
return nil
}
for _, m := range items {
if selectAll {
// Avoid computing labels of the objects to speed up common flows
// of listing all objects.
appendFn(m)
continue
}
metadata, err := meta.Accessor(m)
if err != nil {
return err

View File

@ -17,6 +17,8 @@ limitations under the License.
package api
import (
"fmt"
"k8s.io/apimachinery/pkg/runtime"
)
@ -150,6 +152,25 @@ type AuthProviderConfig struct {
Config map[string]string `json:"config,omitempty"`
}
var _ fmt.Stringer = new(AuthProviderConfig)
var _ fmt.GoStringer = new(AuthProviderConfig)
// GoString implements fmt.GoStringer and sanitizes sensitive fields of
// AuthProviderConfig to prevent accidental leaking via logs.
func (c AuthProviderConfig) GoString() string {
return c.String()
}
// String implements fmt.Stringer and sanitizes sensitive fields of
// AuthProviderConfig to prevent accidental leaking via logs.
func (c AuthProviderConfig) String() string {
cfg := "<nil>"
if c.Config != nil {
cfg = "--- REDACTED ---"
}
return fmt.Sprintf("api.AuthProviderConfig{Name: %q, Config: map[string]string{%s}}", c.Name, cfg)
}
// ExecConfig specifies a command to provide client credentials. The command is exec'd
// and outputs structured stdout holding credentials.
//
@ -172,6 +193,29 @@ type ExecConfig struct {
APIVersion string `json:"apiVersion,omitempty"`
}
var _ fmt.Stringer = new(ExecConfig)
var _ fmt.GoStringer = new(ExecConfig)
// GoString implements fmt.GoStringer and sanitizes sensitive fields of
// ExecConfig to prevent accidental leaking via logs.
func (c ExecConfig) GoString() string {
return c.String()
}
// String implements fmt.Stringer and sanitizes sensitive fields of ExecConfig
// to prevent accidental leaking via logs.
func (c ExecConfig) String() string {
var args []string
if len(c.Args) > 0 {
args = []string{"--- REDACTED ---"}
}
env := "[]ExecEnvVar(nil)"
if len(c.Env) > 0 {
env = "[]ExecEnvVar{--- REDACTED ---}"
}
return fmt.Sprintf("api.AuthProviderConfig{Command: %q, Args: %#v, Env: %s, APIVersion: %q}", c.Command, args, env, c.APIVersion)
}
// ExecEnvVar is used for setting environment variables when executing an exec-based
// credential plugin.
type ExecEnvVar struct {

View File

@ -150,7 +150,12 @@ func (config *DeferredLoadingClientConfig) Namespace() (string, bool, error) {
// if we got a default namespace, determine whether it was explicit or implicit
if raw, err := mergedKubeConfig.RawConfig(); err == nil {
if context := raw.Contexts[raw.CurrentContext]; context != nil && len(context.Namespace) > 0 {
// determine the current context
currentContext := raw.CurrentContext
if config.overrides != nil && len(config.overrides.CurrentContext) > 0 {
currentContext = config.overrides.CurrentContext
}
if context := raw.Contexts[currentContext]; context != nil && len(context.Namespace) > 0 {
return ns, false, nil
}
}

View File

@ -57,7 +57,10 @@ type Config struct {
// from TLSClientConfig, Transport, or http.DefaultTransport). The
// config may layer other RoundTrippers on top of the returned
// RoundTripper.
WrapTransport func(rt http.RoundTripper) http.RoundTripper
//
// A future release will change this field to an array. Use config.Wrap()
// instead of setting this value directly.
WrapTransport WrapperFunc
// Dial specifies the dial function for creating unencrypted TCP connections.
Dial func(ctx context.Context, network, address string) (net.Conn, error)
@ -98,6 +101,14 @@ func (c *Config) HasCertCallback() bool {
return c.TLS.GetCert != nil
}
// Wrap adds a transport middleware function that will give the caller
// an opportunity to wrap the underlying http.RoundTripper prior to the
// first API call being made. The provided function is invoked after any
// existing transport wrappers are invoked.
func (c *Config) Wrap(fn WrapperFunc) {
c.WrapTransport = Wrappers(c.WrapTransport, fn)
}
// TLSConfig holds the information needed to set up a TLS transport.
type TLSConfig struct {
CAFile string // Path of the PEM-encoded server trusted root certificates.

View File

@ -47,14 +47,14 @@ func TokenSourceWrapTransport(ts oauth2.TokenSource) func(http.RoundTripper) htt
func NewCachedFileTokenSource(path string) oauth2.TokenSource {
return &cachingTokenSource{
now: time.Now,
leeway: 1 * time.Minute,
leeway: 10 * time.Second,
base: &fileTokenSource{
path: path,
// This period was picked because it is half of the minimum validity
// duration for a token provisioned by they TokenRequest API. This is
// unsophisticated and should induce rotation at a frequency that should
// work with the token volume source.
period: 5 * time.Minute,
// This period was picked because it is half of the duration between when the kubelet
// refreshes a projected service account token and when the original token expires.
// Default token lifetime is 10 minutes, and the kubelet starts refreshing at 80% of lifetime.
// This should induce re-reading at a frequency that works with the token volume source.
period: time.Minute,
},
}
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package transport
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
@ -167,3 +168,60 @@ func rootCertPool(caData []byte) *x509.CertPool {
certPool.AppendCertsFromPEM(caData)
return certPool
}
// WrapperFunc wraps an http.RoundTripper when a new transport
// is created for a client, allowing per connection behavior
// to be injected.
type WrapperFunc func(rt http.RoundTripper) http.RoundTripper
// Wrappers accepts any number of wrappers and returns a wrapper
// function that is the equivalent of calling each of them in order. Nil
// values are ignored, which makes this function convenient for incrementally
// wrapping a function.
func Wrappers(fns ...WrapperFunc) WrapperFunc {
if len(fns) == 0 {
return nil
}
// optimize the common case of wrapping a possibly nil transport wrapper
// with an additional wrapper
if len(fns) == 2 && fns[0] == nil {
return fns[1]
}
return func(rt http.RoundTripper) http.RoundTripper {
base := rt
for _, fn := range fns {
if fn != nil {
base = fn(base)
}
}
return base
}
}
// ContextCanceller prevents new requests after the provided context is finished.
// err is returned when the context is closed, allowing the caller to provide a context
// appropriate error.
func ContextCanceller(ctx context.Context, err error) WrapperFunc {
return func(rt http.RoundTripper) http.RoundTripper {
return &contextCanceller{
ctx: ctx,
rt: rt,
err: err,
}
}
}
type contextCanceller struct {
ctx context.Context
rt http.RoundTripper
err error
}
func (b *contextCanceller) RoundTrip(req *http.Request) (*http.Response, error) {
select {
case <-b.ctx.Done():
return nil, b.err
default:
return b.rt.RoundTrip(req)
}
}

View File

@ -62,7 +62,7 @@ func (r *BucketRateLimiter) NumRequeues(item interface{}) int {
func (r *BucketRateLimiter) Forget(item interface{}) {
}
// ItemExponentialFailureRateLimiter does a simple baseDelay*10^<num-failures> limit
// ItemExponentialFailureRateLimiter does a simple baseDelay*2^<num-failures> limit
// dealing with max failures and expiration are up to the caller
type ItemExponentialFailureRateLimiter struct {
failuresLock sync.Mutex

View File

@ -43,12 +43,13 @@ func NewNamedDelayingQueue(name string) DelayingInterface {
func newDelayingQueue(clock clock.Clock, name string) DelayingInterface {
ret := &delayingType{
Interface: NewNamed(name),
clock: clock,
heartbeat: clock.NewTicker(maxWait),
stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(name),
Interface: NewNamed(name),
clock: clock,
heartbeat: clock.NewTicker(maxWait),
stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(name),
deprecatedMetrics: newDeprecatedRetryMetrics(name),
}
go ret.waitingLoop()
@ -73,7 +74,8 @@ type delayingType struct {
waitingForAddCh chan *waitFor
// metrics counts the number of retries
metrics retryMetrics
metrics retryMetrics
deprecatedMetrics retryMetrics
}
// waitFor holds the data to add and the time it should be added
@ -146,6 +148,7 @@ func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
}
q.metrics.retry()
q.deprecatedMetrics.retry()
// immediately add things with no delay
if duration <= 0 {

View File

@ -57,6 +57,11 @@ type SummaryMetric interface {
Observe(float64)
}
// HistogramMetric counts individual observations.
type HistogramMetric interface {
Observe(float64)
}
type noopMetric struct{}
func (noopMetric) Inc() {}
@ -73,15 +78,23 @@ type defaultQueueMetrics struct {
// total number of adds handled by a workqueue
adds CounterMetric
// how long an item stays in a workqueue
latency SummaryMetric
latency HistogramMetric
// how long processing an item from a workqueue takes
workDuration SummaryMetric
workDuration HistogramMetric
addTimes map[t]time.Time
processingStartTimes map[t]time.Time
// how long have current threads been working?
unfinishedWorkSeconds SettableGaugeMetric
longestRunningProcessor SettableGaugeMetric
// TODO(danielqsj): Remove the following metrics, they are deprecated
deprecatedDepth GaugeMetric
deprecatedAdds CounterMetric
deprecatedLatency SummaryMetric
deprecatedWorkDuration SummaryMetric
deprecatedUnfinishedWorkSeconds SettableGaugeMetric
deprecatedLongestRunningProcessor SettableGaugeMetric
}
func (m *defaultQueueMetrics) add(item t) {
@ -90,7 +103,9 @@ func (m *defaultQueueMetrics) add(item t) {
}
m.adds.Inc()
m.deprecatedAdds.Inc()
m.depth.Inc()
m.deprecatedDepth.Inc()
if _, exists := m.addTimes[item]; !exists {
m.addTimes[item] = m.clock.Now()
}
@ -102,9 +117,11 @@ func (m *defaultQueueMetrics) get(item t) {
}
m.depth.Dec()
m.deprecatedDepth.Dec()
m.processingStartTimes[item] = m.clock.Now()
if startTime, exists := m.addTimes[item]; exists {
m.latency.Observe(m.sinceInMicroseconds(startTime))
m.latency.Observe(m.sinceInSeconds(startTime))
m.deprecatedLatency.Observe(m.sinceInMicroseconds(startTime))
delete(m.addTimes, item)
}
}
@ -115,7 +132,8 @@ func (m *defaultQueueMetrics) done(item t) {
}
if startTime, exists := m.processingStartTimes[item]; exists {
m.workDuration.Observe(m.sinceInMicroseconds(startTime))
m.workDuration.Observe(m.sinceInSeconds(startTime))
m.deprecatedWorkDuration.Observe(m.sinceInMicroseconds(startTime))
delete(m.processingStartTimes, item)
}
}
@ -135,7 +153,9 @@ func (m *defaultQueueMetrics) updateUnfinishedWork() {
// Convert to seconds; microseconds is unhelpfully granular for this.
total /= 1000000
m.unfinishedWorkSeconds.Set(total)
m.longestRunningProcessor.Set(oldest) // in microseconds.
m.deprecatedUnfinishedWorkSeconds.Set(total)
m.longestRunningProcessor.Set(oldest / 1000000)
m.deprecatedLongestRunningProcessor.Set(oldest) // in microseconds.
}
type noMetrics struct{}
@ -150,6 +170,11 @@ func (m *defaultQueueMetrics) sinceInMicroseconds(start time.Time) float64 {
return float64(m.clock.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds())
}
// Gets the time since the specified start in seconds.
func (m *defaultQueueMetrics) sinceInSeconds(start time.Time) float64 {
return m.clock.Since(start).Seconds()
}
type retryMetrics interface {
retry()
}
@ -170,11 +195,18 @@ func (m *defaultRetryMetrics) retry() {
type MetricsProvider interface {
NewDepthMetric(name string) GaugeMetric
NewAddsMetric(name string) CounterMetric
NewLatencyMetric(name string) SummaryMetric
NewWorkDurationMetric(name string) SummaryMetric
NewLatencyMetric(name string) HistogramMetric
NewWorkDurationMetric(name string) HistogramMetric
NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric
NewLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric
NewLongestRunningProcessorSecondsMetric(name string) SettableGaugeMetric
NewRetriesMetric(name string) CounterMetric
NewDeprecatedDepthMetric(name string) GaugeMetric
NewDeprecatedAddsMetric(name string) CounterMetric
NewDeprecatedLatencyMetric(name string) SummaryMetric
NewDeprecatedWorkDurationMetric(name string) SummaryMetric
NewDeprecatedUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric
NewDeprecatedLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric
NewDeprecatedRetriesMetric(name string) CounterMetric
}
type noopMetricsProvider struct{}
@ -187,11 +219,11 @@ func (_ noopMetricsProvider) NewAddsMetric(name string) CounterMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewLatencyMetric(name string) SummaryMetric {
func (_ noopMetricsProvider) NewLatencyMetric(name string) HistogramMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewWorkDurationMetric(name string) SummaryMetric {
func (_ noopMetricsProvider) NewWorkDurationMetric(name string) HistogramMetric {
return noopMetric{}
}
@ -199,7 +231,7 @@ func (_ noopMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) Settabl
return noopMetric{}
}
func (_ noopMetricsProvider) NewLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric {
func (_ noopMetricsProvider) NewLongestRunningProcessorSecondsMetric(name string) SettableGaugeMetric {
return noopMetric{}
}
@ -207,6 +239,34 @@ func (_ noopMetricsProvider) NewRetriesMetric(name string) CounterMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewDeprecatedDepthMetric(name string) GaugeMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewDeprecatedAddsMetric(name string) CounterMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewDeprecatedLatencyMetric(name string) SummaryMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewDeprecatedWorkDurationMetric(name string) SummaryMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewDeprecatedUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewDeprecatedLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewDeprecatedRetriesMetric(name string) CounterMetric {
return noopMetric{}
}
var globalMetricsFactory = queueMetricsFactory{
metricsProvider: noopMetricsProvider{},
}
@ -229,15 +289,21 @@ func (f *queueMetricsFactory) newQueueMetrics(name string, clock clock.Clock) qu
return noMetrics{}
}
return &defaultQueueMetrics{
clock: clock,
depth: mp.NewDepthMetric(name),
adds: mp.NewAddsMetric(name),
latency: mp.NewLatencyMetric(name),
workDuration: mp.NewWorkDurationMetric(name),
unfinishedWorkSeconds: mp.NewUnfinishedWorkSecondsMetric(name),
longestRunningProcessor: mp.NewLongestRunningProcessorMicrosecondsMetric(name),
addTimes: map[t]time.Time{},
processingStartTimes: map[t]time.Time{},
clock: clock,
depth: mp.NewDepthMetric(name),
adds: mp.NewAddsMetric(name),
latency: mp.NewLatencyMetric(name),
workDuration: mp.NewWorkDurationMetric(name),
unfinishedWorkSeconds: mp.NewUnfinishedWorkSecondsMetric(name),
longestRunningProcessor: mp.NewLongestRunningProcessorSecondsMetric(name),
deprecatedDepth: mp.NewDeprecatedDepthMetric(name),
deprecatedAdds: mp.NewDeprecatedAddsMetric(name),
deprecatedLatency: mp.NewDeprecatedLatencyMetric(name),
deprecatedWorkDuration: mp.NewDeprecatedWorkDurationMetric(name),
deprecatedUnfinishedWorkSeconds: mp.NewDeprecatedUnfinishedWorkSecondsMetric(name),
deprecatedLongestRunningProcessor: mp.NewDeprecatedLongestRunningProcessorMicrosecondsMetric(name),
addTimes: map[t]time.Time{},
processingStartTimes: map[t]time.Time{},
}
}
@ -251,6 +317,16 @@ func newRetryMetrics(name string) retryMetrics {
}
}
func newDeprecatedRetryMetrics(name string) retryMetrics {
var ret *defaultRetryMetrics
if len(name) == 0 {
return ret
}
return &defaultRetryMetrics{
retries: globalMetricsFactory.metricsProvider.NewDeprecatedRetriesMetric(name),
}
}
// SetProvider sets the metrics provider for all subsequently created work
// queues. Only the first call has an effect.
func SetProvider(metricsProvider MetricsProvider) {