Merge pull request #62563 from devdattakulkarni/client-go-details

Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Updated README to include client-go<->controller diagram

The sample-controller makes extensive use of various mechanisms
available in client-go. For writing custom controllers/operators
it will be helpful if there is precise description of how the
client-go library works and how/where it interfaces with
custom controller code.

Recently we published a blog post with these details here:

https://medium.com/@cloudark/kubernetes-custom-controllers-b6c7d0668fdf

This patch includes the diagram from the post, as was recommended
by @sttts on https://github.com/kubernetes/sample-controller/issues/13

**What this PR does / why we need it**:

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes #

**Special notes for your reviewer**:

**Release note**:

```release-note

```

Kubernetes-commit: 773def0194ecc1cd845ea9a01da761d5a0390e6e
This commit is contained in:
Kubernetes Publisher
2018-05-14 05:23:12 -07:00
14 changed files with 1718 additions and 1210 deletions
+70 -33
View File
@@ -22,6 +22,7 @@ import (
"net/url"
"sort"
"strings"
"sync"
"time"
"github.com/golang/protobuf/proto"
@@ -32,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/version"
"k8s.io/client-go/kubernetes/scheme"
restclient "k8s.io/client-go/rest"
@@ -191,33 +193,7 @@ func (d *DiscoveryClient) ServerResourcesForGroupVersion(groupVersion string) (r
// serverResources returns the supported resources for all groups and versions.
func (d *DiscoveryClient) serverResources() ([]*metav1.APIResourceList, error) {
apiGroups, err := d.ServerGroups()
if err != nil {
return nil, err
}
result := []*metav1.APIResourceList{}
failedGroups := make(map[schema.GroupVersion]error)
for _, apiGroup := range apiGroups.Groups {
for _, version := range apiGroup.Versions {
gv := schema.GroupVersion{Group: apiGroup.Name, Version: version.Version}
resources, err := d.ServerResourcesForGroupVersion(version.GroupVersion)
if err != nil {
// TODO: maybe restrict this to NotFound errors
failedGroups[gv] = err
continue
}
result = append(result, resources)
}
}
if len(failedGroups) == 0 {
return result, nil
}
return result, &ErrGroupDiscoveryFailed{Groups: failedGroups}
return ServerResources(d)
}
// ServerResources returns the supported resources for all groups and versions.
@@ -253,6 +229,33 @@ func (d *DiscoveryClient) serverPreferredResources() ([]*metav1.APIResourceList,
return ServerPreferredResources(d)
}
// ServerResources uses the provided discovery interface to look up supported resources for all groups and versions.
func ServerResources(d DiscoveryInterface) ([]*metav1.APIResourceList, error) {
apiGroups, err := d.ServerGroups()
if err != nil {
return nil, err
}
groupVersionResources, failedGroups := fetchGroupVersionResources(d, apiGroups)
// order results by group/version discovery order
result := []*metav1.APIResourceList{}
for _, apiGroup := range apiGroups.Groups {
for _, version := range apiGroup.Versions {
gv := schema.GroupVersion{Group: apiGroup.Name, Version: version.Version}
if resources, ok := groupVersionResources[gv]; ok {
result = append(result, resources)
}
}
}
if len(failedGroups) == 0 {
return result, nil
}
return result, &ErrGroupDiscoveryFailed{Groups: failedGroups}
}
// ServerPreferredResources uses the provided discovery interface to look up preferred resources
func ServerPreferredResources(d DiscoveryInterface) ([]*metav1.APIResourceList, error) {
serverGroupList, err := d.ServerGroups()
@@ -260,9 +263,9 @@ func ServerPreferredResources(d DiscoveryInterface) ([]*metav1.APIResourceList,
return nil, err
}
result := []*metav1.APIResourceList{}
failedGroups := make(map[schema.GroupVersion]error)
groupVersionResources, failedGroups := fetchGroupVersionResources(d, serverGroupList)
result := []*metav1.APIResourceList{}
grVersions := map[schema.GroupResource]string{} // selected version of a GroupResource
grApiResources := map[schema.GroupResource]*metav1.APIResource{} // selected APIResource for a GroupResource
gvApiResourceLists := map[schema.GroupVersion]*metav1.APIResourceList{} // blueprint for a APIResourceList for later grouping
@@ -270,10 +273,9 @@ func ServerPreferredResources(d DiscoveryInterface) ([]*metav1.APIResourceList,
for _, apiGroup := range serverGroupList.Groups {
for _, version := range apiGroup.Versions {
groupVersion := schema.GroupVersion{Group: apiGroup.Name, Version: version.Version}
apiResourceList, err := d.ServerResourcesForGroupVersion(version.GroupVersion)
if err != nil {
// TODO: maybe restrict this to NotFound errors
failedGroups[groupVersion] = err
apiResourceList, ok := groupVersionResources[groupVersion]
if !ok {
continue
}
@@ -315,6 +317,41 @@ func ServerPreferredResources(d DiscoveryInterface) ([]*metav1.APIResourceList,
return result, &ErrGroupDiscoveryFailed{Groups: failedGroups}
}
// fetchServerResourcesForGroupVersions uses the discovery client to fetch the resources for the specified groups in parallel
func fetchGroupVersionResources(d DiscoveryInterface, apiGroups *metav1.APIGroupList) (map[schema.GroupVersion]*metav1.APIResourceList, map[schema.GroupVersion]error) {
groupVersionResources := make(map[schema.GroupVersion]*metav1.APIResourceList)
failedGroups := make(map[schema.GroupVersion]error)
wg := &sync.WaitGroup{}
resultLock := &sync.Mutex{}
for _, apiGroup := range apiGroups.Groups {
for _, version := range apiGroup.Versions {
groupVersion := schema.GroupVersion{Group: apiGroup.Name, Version: version.Version}
wg.Add(1)
go func() {
defer wg.Done()
defer utilruntime.HandleCrash()
apiResourceList, err := d.ServerResourcesForGroupVersion(groupVersion.String())
// lock to record results
resultLock.Lock()
defer resultLock.Unlock()
if err != nil {
// TODO: maybe restrict this to NotFound errors
failedGroups[groupVersion] = err
} else {
groupVersionResources[groupVersion] = apiResourceList
}
}()
}
}
wg.Wait()
return groupVersionResources, failedGroups
}
// ServerPreferredResources returns the supported resources with the version preferred by the
// server.
func (d *DiscoveryClient) ServerPreferredResources() ([]*metav1.APIResourceList, error) {
+21
View File
@@ -225,6 +225,16 @@ func NewRootDeleteAction(resource schema.GroupVersionResource, name string) Dele
return action
}
func NewRootDeleteSubresourceAction(resource schema.GroupVersionResource, subresource string, name string) DeleteActionImpl {
action := DeleteActionImpl{}
action.Verb = "delete"
action.Resource = resource
action.Subresource = subresource
action.Name = name
return action
}
func NewDeleteAction(resource schema.GroupVersionResource, namespace, name string) DeleteActionImpl {
action := DeleteActionImpl{}
action.Verb = "delete"
@@ -235,6 +245,17 @@ func NewDeleteAction(resource schema.GroupVersionResource, namespace, name strin
return action
}
func NewDeleteSubresourceAction(resource schema.GroupVersionResource, subresource, namespace, name string) DeleteActionImpl {
action := DeleteActionImpl{}
action.Verb = "delete"
action.Resource = resource
action.Subresource = subresource
action.Namespace = namespace
action.Name = name
return action
}
func NewRootDeleteCollectionAction(resource schema.GroupVersionResource, opts interface{}) DeleteCollectionActionImpl {
action := DeleteCollectionActionImpl{}
action.Verb = "delete-collection"
+4 -6
View File
@@ -45,7 +45,7 @@ func newDelayingQueue(clock clock.Clock, name string) DelayingInterface {
ret := &delayingType{
Interface: NewNamed(name),
clock: clock,
heartbeat: clock.Tick(maxWait),
heartbeat: clock.NewTicker(maxWait),
stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(name),
@@ -67,10 +67,7 @@ type delayingType struct {
stopCh chan struct{}
// heartbeat ensures we wait no more than maxWait before firing
//
// TODO: replace with Ticker (and add to clock) so this can be cleaned up.
// clock.Tick will leak.
heartbeat <-chan time.Time
heartbeat clock.Ticker
// waitingForAddCh is a buffered channel that feeds waitingForAdd
waitingForAddCh chan *waitFor
@@ -138,6 +135,7 @@ func (pq waitForPriorityQueue) Peek() interface{} {
func (q *delayingType) ShutDown() {
q.Interface.ShutDown()
close(q.stopCh)
q.heartbeat.Stop()
}
// AddAfter adds the given item to the work queue after the given delay
@@ -209,7 +207,7 @@ func (q *delayingType) waitingLoop() {
case <-q.stopCh:
return
case <-q.heartbeat:
case <-q.heartbeat.C():
// continue the loop, which will add ready items
case <-nextReadyAt: