Merge pull request #72214 from caesarxuchao/move-discovery

Move cached discovery clients to their own packages

Kubernetes-commit: d8f014613865955b7e4ce6fefbf38a3c8fe97971
This commit is contained in:
Kubernetes Publisher
2019-02-13 23:07:00 -08:00
parent e8f23bda6e
commit 347c2688d7
23 changed files with 354 additions and 3198 deletions
+24
View File
@@ -365,6 +365,17 @@ func NewTooManyRequestsError(message string) *StatusError {
}}
}
// NewRequestEntityTooLargeError returns an error indicating that the request
// entity was too large.
func NewRequestEntityTooLargeError(message string) *StatusError {
return &StatusError{metav1.Status{
Status: metav1.StatusFailure,
Code: http.StatusRequestEntityTooLarge,
Reason: metav1.StatusReasonRequestEntityTooLarge,
Message: fmt.Sprintf("Request entity too large: %s", message),
}}
}
// NewGenericServerResponse returns a new error for server responses that are not in a recognizable form.
func NewGenericServerResponse(code int, verb string, qualifiedResource schema.GroupResource, name, serverMessage string, retryAfterSeconds int, isUnexpectedResponse bool) *StatusError {
reason := metav1.StatusReasonUnknown
@@ -551,6 +562,19 @@ func IsTooManyRequests(err error) bool {
return false
}
// IsRequestEntityTooLargeError determines if err is an error which indicates
// the request entity is too large.
func IsRequestEntityTooLargeError(err error) bool {
if ReasonForError(err) == metav1.StatusReasonRequestEntityTooLarge {
return true
}
switch t := err.(type) {
case APIStatus:
return t.Status().Code == http.StatusRequestEntityTooLarge
}
return false
}
// IsUnexpectedServerError returns true if the server response was not in the expected API format,
// and may be the result of another HTTP actor.
func IsUnexpectedServerError(err error) bool {
+4
View File
@@ -746,6 +746,10 @@ const (
// Status code 406
StatusReasonNotAcceptable StatusReason = "NotAcceptable"
// StatusReasonRequestEntityTooLarge means that the request entity is too large.
// Status code 413
StatusReasonRequestEntityTooLarge StatusReason = "RequestEntityTooLarge"
// StatusReasonUnsupportedMediaType means that the content type sent by the client is not acceptable
// to the server - for instance, attempting to send protobuf for a resource that supports only json and yaml.
// API calls that return UnsupportedMediaType can never succeed.
-292
View File
@@ -1,292 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package discovery
import (
"errors"
"io/ioutil"
"net/http"
"os"
"path/filepath"
"sync"
"time"
openapi_v2 "github.com/googleapis/gnostic/OpenAPIv2"
"k8s.io/klog"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/version"
"k8s.io/client-go/kubernetes/scheme"
restclient "k8s.io/client-go/rest"
)
// CachedDiscoveryClient implements the functions that discovery server-supported API groups,
// versions and resources.
type CachedDiscoveryClient struct {
delegate DiscoveryInterface
// cacheDirectory is the directory where discovery docs are held. It must be unique per host:port combination to work well.
cacheDirectory string
// ttl is how long the cache should be considered valid
ttl time.Duration
// mutex protects the variables below
mutex sync.Mutex
// ourFiles are all filenames of cache files created by this process
ourFiles map[string]struct{}
// invalidated is true if all cache files should be ignored that are not ours (e.g. after Invalidate() was called)
invalidated bool
// fresh is true if all used cache files were ours
fresh bool
}
var _ CachedDiscoveryInterface = &CachedDiscoveryClient{}
// ServerResourcesForGroupVersion returns the supported resources for a group and version.
func (d *CachedDiscoveryClient) ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) {
filename := filepath.Join(d.cacheDirectory, groupVersion, "serverresources.json")
cachedBytes, err := d.getCachedFile(filename)
// don't fail on errors, we either don't have a file or won't be able to run the cached check. Either way we can fallback.
if err == nil {
cachedResources := &metav1.APIResourceList{}
if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), cachedBytes, cachedResources); err == nil {
klog.V(10).Infof("returning cached discovery info from %v", filename)
return cachedResources, nil
}
}
liveResources, err := d.delegate.ServerResourcesForGroupVersion(groupVersion)
if err != nil {
klog.V(3).Infof("skipped caching discovery info due to %v", err)
return liveResources, err
}
if liveResources == nil || len(liveResources.APIResources) == 0 {
klog.V(3).Infof("skipped caching discovery info, no resources found")
return liveResources, err
}
if err := d.writeCachedFile(filename, liveResources); err != nil {
klog.V(1).Infof("failed to write cache to %v due to %v", filename, err)
}
return liveResources, nil
}
// ServerResources returns the supported resources for all groups and versions.
func (d *CachedDiscoveryClient) ServerResources() ([]*metav1.APIResourceList, error) {
return ServerResources(d)
}
// ServerGroups returns the supported groups, with information like supported versions and the
// preferred version.
func (d *CachedDiscoveryClient) ServerGroups() (*metav1.APIGroupList, error) {
filename := filepath.Join(d.cacheDirectory, "servergroups.json")
cachedBytes, err := d.getCachedFile(filename)
// don't fail on errors, we either don't have a file or won't be able to run the cached check. Either way we can fallback.
if err == nil {
cachedGroups := &metav1.APIGroupList{}
if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), cachedBytes, cachedGroups); err == nil {
klog.V(10).Infof("returning cached discovery info from %v", filename)
return cachedGroups, nil
}
}
liveGroups, err := d.delegate.ServerGroups()
if err != nil {
klog.V(3).Infof("skipped caching discovery info due to %v", err)
return liveGroups, err
}
if liveGroups == nil || len(liveGroups.Groups) == 0 {
klog.V(3).Infof("skipped caching discovery info, no groups found")
return liveGroups, err
}
if err := d.writeCachedFile(filename, liveGroups); err != nil {
klog.V(1).Infof("failed to write cache to %v due to %v", filename, err)
}
return liveGroups, nil
}
func (d *CachedDiscoveryClient) getCachedFile(filename string) ([]byte, error) {
// after invalidation ignore cache files not created by this process
d.mutex.Lock()
_, ourFile := d.ourFiles[filename]
if d.invalidated && !ourFile {
d.mutex.Unlock()
return nil, errors.New("cache invalidated")
}
d.mutex.Unlock()
file, err := os.Open(filename)
if err != nil {
return nil, err
}
defer file.Close()
fileInfo, err := file.Stat()
if err != nil {
return nil, err
}
if time.Now().After(fileInfo.ModTime().Add(d.ttl)) {
return nil, errors.New("cache expired")
}
// the cache is present and its valid. Try to read and use it.
cachedBytes, err := ioutil.ReadAll(file)
if err != nil {
return nil, err
}
d.mutex.Lock()
defer d.mutex.Unlock()
d.fresh = d.fresh && ourFile
return cachedBytes, nil
}
func (d *CachedDiscoveryClient) writeCachedFile(filename string, obj runtime.Object) error {
if err := os.MkdirAll(filepath.Dir(filename), 0755); err != nil {
return err
}
bytes, err := runtime.Encode(scheme.Codecs.LegacyCodec(), obj)
if err != nil {
return err
}
f, err := ioutil.TempFile(filepath.Dir(filename), filepath.Base(filename)+".")
if err != nil {
return err
}
defer os.Remove(f.Name())
_, err = f.Write(bytes)
if err != nil {
return err
}
err = os.Chmod(f.Name(), 0755)
if err != nil {
return err
}
name := f.Name()
err = f.Close()
if err != nil {
return err
}
// atomic rename
d.mutex.Lock()
defer d.mutex.Unlock()
err = os.Rename(name, filename)
if err == nil {
d.ourFiles[filename] = struct{}{}
}
return err
}
// RESTClient returns a RESTClient that is used to communicate with API server
// by this client implementation.
func (d *CachedDiscoveryClient) RESTClient() restclient.Interface {
return d.delegate.RESTClient()
}
// ServerPreferredResources returns the supported resources with the version preferred by the
// server.
func (d *CachedDiscoveryClient) ServerPreferredResources() ([]*metav1.APIResourceList, error) {
return ServerPreferredResources(d)
}
// ServerPreferredNamespacedResources returns the supported namespaced resources with the
// version preferred by the server.
func (d *CachedDiscoveryClient) ServerPreferredNamespacedResources() ([]*metav1.APIResourceList, error) {
return ServerPreferredNamespacedResources(d)
}
// ServerVersion retrieves and parses the server's version (git version).
func (d *CachedDiscoveryClient) ServerVersion() (*version.Info, error) {
return d.delegate.ServerVersion()
}
// OpenAPISchema retrieves and parses the swagger API schema the server supports.
func (d *CachedDiscoveryClient) OpenAPISchema() (*openapi_v2.Document, error) {
return d.delegate.OpenAPISchema()
}
// Fresh is supposed to tell the caller whether or not to retry if the cache
// fails to find something (false = retry, true = no need to retry).
func (d *CachedDiscoveryClient) Fresh() bool {
d.mutex.Lock()
defer d.mutex.Unlock()
return d.fresh
}
// Invalidate enforces that no cached data is used in the future that is older than the current time.
func (d *CachedDiscoveryClient) Invalidate() {
d.mutex.Lock()
defer d.mutex.Unlock()
d.ourFiles = map[string]struct{}{}
d.fresh = true
d.invalidated = true
}
// NewCachedDiscoveryClientForConfig creates a new DiscoveryClient for the given config, and wraps
// the created client in a CachedDiscoveryClient. The provided configuration is updated with a
// custom transport that understands cache responses.
// We receive two distinct cache directories for now, in order to preserve old behavior
// which makes use of the --cache-dir flag value for storing cache data from the CacheRoundTripper,
// and makes use of the hardcoded destination (~/.kube/cache/discovery/...) for storing
// CachedDiscoveryClient cache data. If httpCacheDir is empty, the restconfig's transport will not
// be updated with a roundtripper that understands cache responses.
// If discoveryCacheDir is empty, cached server resource data will be looked up in the current directory.
// TODO(juanvallejo): the value of "--cache-dir" should be honored. Consolidate discoveryCacheDir with httpCacheDir
// so that server resources and http-cache data are stored in the same location, provided via config flags.
func NewCachedDiscoveryClientForConfig(config *restclient.Config, discoveryCacheDir, httpCacheDir string, ttl time.Duration) (*CachedDiscoveryClient, error) {
if len(httpCacheDir) > 0 {
// update the given restconfig with a custom roundtripper that
// understands how to handle cache responses.
config = restclient.CopyConfig(config)
config.Wrap(func(rt http.RoundTripper) http.RoundTripper {
return newCacheRoundTripper(httpCacheDir, rt)
})
}
discoveryClient, err := NewDiscoveryClientForConfig(config)
if err != nil {
return nil, err
}
return newCachedDiscoveryClient(discoveryClient, discoveryCacheDir, ttl), nil
}
// NewCachedDiscoveryClient creates a new DiscoveryClient. cacheDirectory is the directory where discovery docs are held. It must be unique per host:port combination to work well.
func newCachedDiscoveryClient(delegate DiscoveryInterface, cacheDirectory string, ttl time.Duration) *CachedDiscoveryClient {
return &CachedDiscoveryClient{
delegate: delegate,
cacheDirectory: cacheDirectory,
ttl: ttl,
ourFiles: map[string]struct{}{},
fresh: true,
}
}
+58 -26
View File
@@ -88,12 +88,28 @@ type ServerResourcesInterface interface {
// ServerResourcesForGroupVersion returns the supported resources for a group and version.
ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error)
// ServerResources returns the supported resources for all groups and versions.
//
// The returned resource list might be non-nil with partial results even in the case of
// non-nil error.
//
// Deprecated: use ServerGroupsAndResources instead.
ServerResources() ([]*metav1.APIResourceList, error)
// ServerResources returns the supported groups and resources for all groups and versions.
//
// The returned group and resource lists might be non-nil with partial results even in the
// case of non-nil error.
ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error)
// ServerPreferredResources returns the supported resources with the version preferred by the
// server.
//
// The returned group and resource lists might be non-nil with partial results even in the
// case of non-nil error.
ServerPreferredResources() ([]*metav1.APIResourceList, error)
// ServerPreferredNamespacedResources returns the supported namespaced resources with the
// version preferred by the server.
//
// The returned resource list might be non-nil with partial results even in the case of
// non-nil error.
ServerPreferredNamespacedResources() ([]*metav1.APIResourceList, error)
}
@@ -191,14 +207,18 @@ func (d *DiscoveryClient) ServerResourcesForGroupVersion(groupVersion string) (r
return resources, nil
}
// serverResources returns the supported resources for all groups and versions.
func (d *DiscoveryClient) serverResources() ([]*metav1.APIResourceList, error) {
return ServerResources(d)
// ServerResources returns the supported resources for all groups and versions.
// Deprecated: use ServerGroupsAndResources instead.
func (d *DiscoveryClient) ServerResources() ([]*metav1.APIResourceList, error) {
_, rs, err := d.ServerGroupsAndResources()
return rs, err
}
// ServerResources returns the supported resources for all groups and versions.
func (d *DiscoveryClient) ServerResources() ([]*metav1.APIResourceList, error) {
return withRetries(defaultRetries, d.serverResources)
// ServerGroupsAndResources returns the supported resources for all groups and versions.
func (d *DiscoveryClient) ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
return withRetries(defaultRetries, func() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
return ServerGroupsAndResources(d)
})
}
// ErrGroupDiscoveryFailed is returned if one or more API groups fail to load.
@@ -224,23 +244,28 @@ func IsGroupDiscoveryFailedError(err error) bool {
return err != nil && ok
}
// serverPreferredResources returns the supported resources with the version preferred by the server.
func (d *DiscoveryClient) serverPreferredResources() ([]*metav1.APIResourceList, error) {
return ServerPreferredResources(d)
// ServerResources uses the provided discovery interface to look up supported resources for all groups and versions.
// Deprecated: use ServerGroupsAndResources instead.
func ServerResources(d DiscoveryInterface) ([]*metav1.APIResourceList, error) {
_, rs, err := ServerGroupsAndResources(d)
return rs, err
}
// ServerResources uses the provided discovery interface to look up supported resources for all groups and versions.
func ServerResources(d DiscoveryInterface) ([]*metav1.APIResourceList, error) {
apiGroups, err := d.ServerGroups()
if err != nil {
return nil, err
func ServerGroupsAndResources(d DiscoveryInterface) ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
sgs, err := d.ServerGroups()
if sgs == nil {
return nil, nil, err
}
resultGroups := []*metav1.APIGroup{}
for i := range sgs.Groups {
resultGroups = append(resultGroups, &sgs.Groups[i])
}
groupVersionResources, failedGroups := fetchGroupVersionResources(d, apiGroups)
groupVersionResources, failedGroups := fetchGroupVersionResources(d, sgs)
// order results by group/version discovery order
result := []*metav1.APIResourceList{}
for _, apiGroup := range apiGroups.Groups {
for _, apiGroup := range sgs.Groups {
for _, version := range apiGroup.Versions {
gv := schema.GroupVersion{Group: apiGroup.Name, Version: version.Version}
if resources, ok := groupVersionResources[gv]; ok {
@@ -250,10 +275,10 @@ func ServerResources(d DiscoveryInterface) ([]*metav1.APIResourceList, error) {
}
if len(failedGroups) == 0 {
return result, nil
return resultGroups, result, nil
}
return result, &ErrGroupDiscoveryFailed{Groups: failedGroups}
return resultGroups, result, &ErrGroupDiscoveryFailed{Groups: failedGroups}
}
// ServerPreferredResources uses the provided discovery interface to look up preferred resources
@@ -317,7 +342,7 @@ func ServerPreferredResources(d DiscoveryInterface) ([]*metav1.APIResourceList,
return result, &ErrGroupDiscoveryFailed{Groups: failedGroups}
}
// fetchServerResourcesForGroupVersions uses the discovery client to fetch the resources for the specified groups in parallel
// fetchServerResourcesForGroupVersions uses the discovery client to fetch the resources for the specified groups in parallel.
func fetchGroupVersionResources(d DiscoveryInterface, apiGroups *metav1.APIGroupList) (map[schema.GroupVersion]*metav1.APIResourceList, map[schema.GroupVersion]error) {
groupVersionResources := make(map[schema.GroupVersion]*metav1.APIResourceList)
failedGroups := make(map[schema.GroupVersion]error)
@@ -341,7 +366,9 @@ func fetchGroupVersionResources(d DiscoveryInterface, apiGroups *metav1.APIGroup
if err != nil {
// TODO: maybe restrict this to NotFound errors
failedGroups[groupVersion] = err
} else {
}
if apiResourceList != nil {
// even in case of error, some fallback might have been returned
groupVersionResources[groupVersion] = apiResourceList
}
}()
@@ -355,7 +382,11 @@ func fetchGroupVersionResources(d DiscoveryInterface, apiGroups *metav1.APIGroup
// ServerPreferredResources returns the supported resources with the version preferred by the
// server.
func (d *DiscoveryClient) ServerPreferredResources() ([]*metav1.APIResourceList, error) {
return withRetries(defaultRetries, d.serverPreferredResources)
_, rs, err := withRetries(defaultRetries, func() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
rs, err := ServerPreferredResources(d)
return nil, rs, err
})
return rs, err
}
// ServerPreferredNamespacedResources returns the supported namespaced resources with the
@@ -410,19 +441,20 @@ func (d *DiscoveryClient) OpenAPISchema() (*openapi_v2.Document, error) {
}
// withRetries retries the given recovery function in case the groups supported by the server change after ServerGroup() returns.
func withRetries(maxRetries int, f func() ([]*metav1.APIResourceList, error)) ([]*metav1.APIResourceList, error) {
func withRetries(maxRetries int, f func() ([]*metav1.APIGroup, []*metav1.APIResourceList, error)) ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
var result []*metav1.APIResourceList
var resultGroups []*metav1.APIGroup
var err error
for i := 0; i < maxRetries; i++ {
result, err = f()
resultGroups, result, err = f()
if err == nil {
return result, nil
return resultGroups, result, nil
}
if _, ok := err.(*ErrGroupDiscoveryFailed); !ok {
return nil, err
return nil, nil, err
}
}
return result, err
return resultGroups, result, err
}
func setDiscoveryDefaults(config *restclient.Config) error {
+17 -1
View File
@@ -53,13 +53,29 @@ func (c *FakeDiscovery) ServerResourcesForGroupVersion(groupVersion string) (*me
}
// ServerResources returns the supported resources for all groups and versions.
// Deprecated: use ServerGroupsAndResources instead.
func (c *FakeDiscovery) ServerResources() ([]*metav1.APIResourceList, error) {
_, rs, err := c.ServerGroupsAndResources()
return rs, err
}
// ServerGroupsAndResources returns the supported groups and resources for all groups and versions.
func (c *FakeDiscovery) ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
sgs, err := c.ServerGroups()
if err != nil {
return nil, nil, err
}
resultGroups := []*metav1.APIGroup{}
for i := range sgs.Groups {
resultGroups = append(resultGroups, &sgs.Groups[i])
}
action := testing.ActionImpl{
Verb: "get",
Resource: schema.GroupVersionResource{Resource: "resource"},
}
c.Invokes(action, nil)
return c.Resources, nil
return resultGroups, c.Resources, nil
}
// ServerPreferredResources returns the supported resources with the version
-62
View File
@@ -1,62 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package discovery
import (
"net/http"
"path/filepath"
"github.com/gregjones/httpcache"
"github.com/gregjones/httpcache/diskcache"
"github.com/peterbourgon/diskv"
"k8s.io/klog"
)
type cacheRoundTripper struct {
rt *httpcache.Transport
}
// newCacheRoundTripper creates a roundtripper that reads the ETag on
// response headers and send the If-None-Match header on subsequent
// corresponding requests.
func newCacheRoundTripper(cacheDir string, rt http.RoundTripper) http.RoundTripper {
d := diskv.New(diskv.Options{
BasePath: cacheDir,
TempDir: filepath.Join(cacheDir, ".diskv-temp"),
})
t := httpcache.NewTransport(diskcache.NewWithDiskv(d))
t.Transport = rt
return &cacheRoundTripper{rt: t}
}
func (rt *cacheRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
return rt.rt.RoundTrip(req)
}
func (rt *cacheRoundTripper) CancelRequest(req *http.Request) {
type canceler interface {
CancelRequest(*http.Request)
}
if cr, ok := rt.rt.Transport.(canceler); ok {
cr.CancelRequest(req)
} else {
klog.Errorf("CancelRequest not implemented by %T", rt.rt.Transport)
}
}
func (rt *cacheRoundTripper) WrappedRoundTripper() http.RoundTripper { return rt.rt.Transport }
+7 -7
View File
@@ -144,13 +144,13 @@ func (c *ExpirationCache) ListKeys() []string {
// Add timestamps an item and inserts it into the cache, overwriting entries
// that might exist under the same key.
func (c *ExpirationCache) Add(obj interface{}) error {
c.expirationLock.Lock()
defer c.expirationLock.Unlock()
key, err := c.keyFunc(obj)
if err != nil {
return KeyError{obj, err}
}
c.expirationLock.Lock()
defer c.expirationLock.Unlock()
c.cacheStorage.Add(key, &timestampedEntry{obj, c.clock.Now()})
return nil
}
@@ -163,12 +163,12 @@ func (c *ExpirationCache) Update(obj interface{}) error {
// Delete removes an item from the cache.
func (c *ExpirationCache) Delete(obj interface{}) error {
c.expirationLock.Lock()
defer c.expirationLock.Unlock()
key, err := c.keyFunc(obj)
if err != nil {
return KeyError{obj, err}
}
c.expirationLock.Lock()
defer c.expirationLock.Unlock()
c.cacheStorage.Delete(key)
return nil
}
@@ -177,8 +177,6 @@ func (c *ExpirationCache) Delete(obj interface{}) error {
// before attempting the replace operation. The replace operation will
// delete the contents of the ExpirationCache `c`.
func (c *ExpirationCache) Replace(list []interface{}, resourceVersion string) error {
c.expirationLock.Lock()
defer c.expirationLock.Unlock()
items := make(map[string]interface{}, len(list))
ts := c.clock.Now()
for _, item := range list {
@@ -188,6 +186,8 @@ func (c *ExpirationCache) Replace(list []interface{}, resourceVersion string) er
}
items[key] = &timestampedEntry{item, ts}
}
c.expirationLock.Lock()
defer c.expirationLock.Unlock()
c.cacheStorage.Replace(items, resourceVersion)
return nil
}