mirror of
https://github.com/kubernetes/sample-controller.git
synced 2025-04-01 12:32:40 +08:00
Brush up the sample controller
Signed-off-by: Mike Spreitzer <mspreitz@us.ibm.com> Kubernetes-commit: 7848612de8d0f6e4ac67c180184ee4bed1ac7691
This commit is contained in:
parent
9b42874209
commit
6e60f3495c
@ -61,6 +61,8 @@ const (
|
|||||||
// MessageResourceSynced is the message used for an Event fired when a Foo
|
// MessageResourceSynced is the message used for an Event fired when a Foo
|
||||||
// is synced successfully
|
// is synced successfully
|
||||||
MessageResourceSynced = "Foo synced successfully"
|
MessageResourceSynced = "Foo synced successfully"
|
||||||
|
// FieldManager distinguishes this controller from other things writing to API objects
|
||||||
|
FieldManager = controllerAgentName
|
||||||
)
|
)
|
||||||
|
|
||||||
// Controller is the controller implementation for Foo resources
|
// Controller is the controller implementation for Foo resources
|
||||||
@ -80,7 +82,7 @@ type Controller struct {
|
|||||||
// means we can ensure we only process a fixed amount of resources at a
|
// means we can ensure we only process a fixed amount of resources at a
|
||||||
// time, and makes it easy to ensure we are never processing the same item
|
// time, and makes it easy to ensure we are never processing the same item
|
||||||
// simultaneously in two different workers.
|
// simultaneously in two different workers.
|
||||||
workqueue workqueue.TypedRateLimitingInterface[string]
|
workqueue workqueue.TypedRateLimitingInterface[cache.ObjectName]
|
||||||
// recorder is an event recorder for recording Event resources to the
|
// recorder is an event recorder for recording Event resources to the
|
||||||
// Kubernetes API.
|
// Kubernetes API.
|
||||||
recorder record.EventRecorder
|
recorder record.EventRecorder
|
||||||
@ -106,8 +108,8 @@ func NewController(
|
|||||||
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
|
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
|
||||||
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
|
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
|
||||||
ratelimiter := workqueue.NewTypedMaxOfRateLimiter(
|
ratelimiter := workqueue.NewTypedMaxOfRateLimiter(
|
||||||
workqueue.NewTypedItemExponentialFailureRateLimiter[string](5*time.Millisecond, 1000*time.Second),
|
workqueue.NewTypedItemExponentialFailureRateLimiter[cache.ObjectName](5*time.Millisecond, 1000*time.Second),
|
||||||
&workqueue.TypedBucketRateLimiter[string]{Limiter: rate.NewLimiter(rate.Limit(50), 300)},
|
&workqueue.TypedBucketRateLimiter[cache.ObjectName]{Limiter: rate.NewLimiter(rate.Limit(50), 300)},
|
||||||
)
|
)
|
||||||
|
|
||||||
controller := &Controller{
|
controller := &Controller{
|
||||||
@ -196,64 +198,56 @@ func (c *Controller) runWorker(ctx context.Context) {
|
|||||||
// processNextWorkItem will read a single work item off the workqueue and
|
// processNextWorkItem will read a single work item off the workqueue and
|
||||||
// attempt to process it, by calling the syncHandler.
|
// attempt to process it, by calling the syncHandler.
|
||||||
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
|
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
|
||||||
obj, shutdown := c.workqueue.Get()
|
objRef, shutdown := c.workqueue.Get()
|
||||||
logger := klog.FromContext(ctx)
|
logger := klog.FromContext(ctx)
|
||||||
|
|
||||||
if shutdown {
|
if shutdown {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// We wrap this block in a func so we can defer c.workqueue.Done.
|
// We call Done at the end of this func so the workqueue knows we have
|
||||||
err := func() error {
|
// finished processing this item. We also must remember to call Forget
|
||||||
// We call Done here so the workqueue knows we have finished
|
// if we do not want this work item being re-queued. For example, we do
|
||||||
// processing this item. We also must remember to call Forget if we
|
// not call Forget if a transient error occurs, instead the item is
|
||||||
// do not want this work item being re-queued. For example, we do
|
// put back on the workqueue and attempted again after a back-off
|
||||||
// not call Forget if a transient error occurs, instead the item is
|
// period.
|
||||||
// put back on the workqueue and attempted again after a back-off
|
defer c.workqueue.Done(objRef)
|
||||||
// period.
|
|
||||||
defer c.workqueue.Done(obj)
|
|
||||||
// Run the syncHandler, passing it the namespace/name string of the
|
|
||||||
// Foo resource to be synced.
|
|
||||||
if err := c.syncHandler(ctx, obj); err != nil {
|
|
||||||
// Put the item back on the workqueue to handle any transient errors.
|
|
||||||
c.workqueue.AddRateLimited(obj)
|
|
||||||
return fmt.Errorf("error syncing '%s': %s, requeuing", obj, err.Error())
|
|
||||||
}
|
|
||||||
// Finally, if no error occurs we Forget this item so it does not
|
|
||||||
// get queued again until another change happens.
|
|
||||||
c.workqueue.Forget(obj)
|
|
||||||
logger.Info("Successfully synced", "resourceName", obj)
|
|
||||||
return nil
|
|
||||||
}()
|
|
||||||
|
|
||||||
if err != nil {
|
// Run the syncHandler, passing it the structured reference to the object to be synced.
|
||||||
utilruntime.HandleError(err)
|
err := c.syncHandler(ctx, objRef)
|
||||||
|
if err == nil {
|
||||||
|
// If no error occurs then we Forget this item so it does not
|
||||||
|
// get queued again until another change happens.
|
||||||
|
c.workqueue.Forget(objRef)
|
||||||
|
logger.Info("Successfully synced", "objectName", objRef)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
// there was a failure so be sure to report it. This method allows for
|
||||||
|
// pluggable error handling which can be used for things like
|
||||||
|
// cluster-monitoring.
|
||||||
|
utilruntime.HandleErrorWithContext(ctx, err, "error syncing; requeuing", "objectReference", objRef)
|
||||||
|
// since we failed, we should requeue the item to work on later. This
|
||||||
|
// method will add a backoff to avoid hotlooping on particular items
|
||||||
|
// (they're probably still not going to work right away) and overall
|
||||||
|
// controller protection (everything I've done is broken, this controller
|
||||||
|
// needs to calm down or it can starve other useful work) cases.
|
||||||
|
c.workqueue.AddRateLimited(objRef)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// syncHandler compares the actual state with the desired, and attempts to
|
// syncHandler compares the actual state with the desired, and attempts to
|
||||||
// converge the two. It then updates the Status block of the Foo resource
|
// converge the two. It then updates the Status block of the Foo resource
|
||||||
// with the current status of the resource.
|
// with the current status of the resource.
|
||||||
func (c *Controller) syncHandler(ctx context.Context, key string) error {
|
func (c *Controller) syncHandler(ctx context.Context, objectRef cache.ObjectName) error {
|
||||||
// Convert the namespace/name string into a distinct namespace and name
|
logger := klog.LoggerWithValues(klog.FromContext(ctx), "objectRef", objectRef)
|
||||||
logger := klog.LoggerWithValues(klog.FromContext(ctx), "resourceName", key)
|
|
||||||
|
|
||||||
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
|
||||||
if err != nil {
|
|
||||||
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get the Foo resource with this namespace/name
|
// Get the Foo resource with this namespace/name
|
||||||
foo, err := c.foosLister.Foos(namespace).Get(name)
|
foo, err := c.foosLister.Foos(objectRef.Namespace).Get(objectRef.Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// The Foo resource may no longer exist, in which case we stop
|
// The Foo resource may no longer exist, in which case we stop
|
||||||
// processing.
|
// processing.
|
||||||
if errors.IsNotFound(err) {
|
if errors.IsNotFound(err) {
|
||||||
utilruntime.HandleError(fmt.Errorf("foo '%s' in work queue no longer exists", key))
|
utilruntime.HandleError(fmt.Errorf("foo '%#v' in work queue no longer exists", objectRef))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -265,7 +259,7 @@ func (c *Controller) syncHandler(ctx context.Context, key string) error {
|
|||||||
// We choose to absorb the error here as the worker would requeue the
|
// We choose to absorb the error here as the worker would requeue the
|
||||||
// resource otherwise. Instead, the next time the resource is updated
|
// resource otherwise. Instead, the next time the resource is updated
|
||||||
// the resource will be queued again.
|
// the resource will be queued again.
|
||||||
utilruntime.HandleError(fmt.Errorf("%s: deployment name must be specified", key))
|
utilruntime.HandleError(fmt.Errorf("%#v: deployment name must be specified", objectRef))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -273,7 +267,7 @@ func (c *Controller) syncHandler(ctx context.Context, key string) error {
|
|||||||
deployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName)
|
deployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName)
|
||||||
// If the resource doesn't exist, we'll create it
|
// If the resource doesn't exist, we'll create it
|
||||||
if errors.IsNotFound(err) {
|
if errors.IsNotFound(err) {
|
||||||
deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(context.TODO(), newDeployment(foo), metav1.CreateOptions{})
|
deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(context.TODO(), newDeployment(foo), metav1.CreateOptions{FieldManager: FieldManager})
|
||||||
}
|
}
|
||||||
|
|
||||||
// If an error occurs during Get/Create, we'll requeue the item so we can
|
// If an error occurs during Get/Create, we'll requeue the item so we can
|
||||||
@ -296,7 +290,7 @@ func (c *Controller) syncHandler(ctx context.Context, key string) error {
|
|||||||
// should update the Deployment resource.
|
// should update the Deployment resource.
|
||||||
if foo.Spec.Replicas != nil && *foo.Spec.Replicas != *deployment.Spec.Replicas {
|
if foo.Spec.Replicas != nil && *foo.Spec.Replicas != *deployment.Spec.Replicas {
|
||||||
logger.V(4).Info("Update deployment resource", "currentReplicas", *foo.Spec.Replicas, "desiredReplicas", *deployment.Spec.Replicas)
|
logger.V(4).Info("Update deployment resource", "currentReplicas", *foo.Spec.Replicas, "desiredReplicas", *deployment.Spec.Replicas)
|
||||||
deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(context.TODO(), newDeployment(foo), metav1.UpdateOptions{})
|
deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(context.TODO(), newDeployment(foo), metav1.UpdateOptions{FieldManager: FieldManager})
|
||||||
}
|
}
|
||||||
|
|
||||||
// If an error occurs during Update, we'll requeue the item so we can
|
// If an error occurs during Update, we'll requeue the item so we can
|
||||||
@ -327,7 +321,7 @@ func (c *Controller) updateFooStatus(foo *samplev1alpha1.Foo, deployment *appsv1
|
|||||||
// we must use Update instead of UpdateStatus to update the Status block of the Foo resource.
|
// we must use Update instead of UpdateStatus to update the Status block of the Foo resource.
|
||||||
// UpdateStatus will not allow changes to the Spec of the resource,
|
// UpdateStatus will not allow changes to the Spec of the resource,
|
||||||
// which is ideal for ensuring nothing other than resource status has been updated.
|
// which is ideal for ensuring nothing other than resource status has been updated.
|
||||||
_, err := c.sampleclientset.SamplecontrollerV1alpha1().Foos(foo.Namespace).UpdateStatus(context.TODO(), fooCopy, metav1.UpdateOptions{})
|
_, err := c.sampleclientset.SamplecontrollerV1alpha1().Foos(foo.Namespace).UpdateStatus(context.TODO(), fooCopy, metav1.UpdateOptions{FieldManager: FieldManager})
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -335,13 +329,12 @@ func (c *Controller) updateFooStatus(foo *samplev1alpha1.Foo, deployment *appsv1
|
|||||||
// string which is then put onto the work queue. This method should *not* be
|
// string which is then put onto the work queue. This method should *not* be
|
||||||
// passed resources of any type other than Foo.
|
// passed resources of any type other than Foo.
|
||||||
func (c *Controller) enqueueFoo(obj interface{}) {
|
func (c *Controller) enqueueFoo(obj interface{}) {
|
||||||
var key string
|
if objectRef, err := cache.ObjectToName(obj); err != nil {
|
||||||
var err error
|
|
||||||
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
|
|
||||||
utilruntime.HandleError(err)
|
utilruntime.HandleError(err)
|
||||||
return
|
return
|
||||||
|
} else {
|
||||||
|
c.workqueue.Add(objectRef)
|
||||||
}
|
}
|
||||||
c.workqueue.Add(key)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleObject will take any resource implementing metav1.Object and attempt
|
// handleObject will take any resource implementing metav1.Object and attempt
|
||||||
|
@ -108,22 +108,22 @@ func (f *fixture) newController(ctx context.Context) (*Controller, informers.Sha
|
|||||||
return c, i, k8sI
|
return c, i, k8sI
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *fixture) run(ctx context.Context, fooName string) {
|
func (f *fixture) run(ctx context.Context, fooRef cache.ObjectName) {
|
||||||
f.runController(ctx, fooName, true, false)
|
f.runController(ctx, fooRef, true, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *fixture) runExpectError(ctx context.Context, fooName string) {
|
func (f *fixture) runExpectError(ctx context.Context, fooRef cache.ObjectName) {
|
||||||
f.runController(ctx, fooName, true, true)
|
f.runController(ctx, fooRef, true, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *fixture) runController(ctx context.Context, fooName string, startInformers bool, expectError bool) {
|
func (f *fixture) runController(ctx context.Context, fooRef cache.ObjectName, startInformers bool, expectError bool) {
|
||||||
c, i, k8sI := f.newController(ctx)
|
c, i, k8sI := f.newController(ctx)
|
||||||
if startInformers {
|
if startInformers {
|
||||||
i.Start(ctx.Done())
|
i.Start(ctx.Done())
|
||||||
k8sI.Start(ctx.Done())
|
k8sI.Start(ctx.Done())
|
||||||
}
|
}
|
||||||
|
|
||||||
err := c.syncHandler(ctx, fooName)
|
err := c.syncHandler(ctx, fooRef)
|
||||||
if !expectError && err != nil {
|
if !expectError && err != nil {
|
||||||
f.t.Errorf("error syncing foo: %v", err)
|
f.t.Errorf("error syncing foo: %v", err)
|
||||||
} else if expectError && err == nil {
|
} else if expectError && err == nil {
|
||||||
@ -240,13 +240,9 @@ func (f *fixture) expectUpdateFooStatusAction(foo *samplecontroller.Foo) {
|
|||||||
f.actions = append(f.actions, action)
|
f.actions = append(f.actions, action)
|
||||||
}
|
}
|
||||||
|
|
||||||
func getKey(foo *samplecontroller.Foo, t *testing.T) string {
|
func getRef(foo *samplecontroller.Foo, t *testing.T) cache.ObjectName {
|
||||||
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(foo)
|
ref := cache.MetaObjectToName(foo)
|
||||||
if err != nil {
|
return ref
|
||||||
t.Errorf("Unexpected error getting key for foo %v: %v", foo.Name, err)
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
return key
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCreatesDeployment(t *testing.T) {
|
func TestCreatesDeployment(t *testing.T) {
|
||||||
@ -261,7 +257,7 @@ func TestCreatesDeployment(t *testing.T) {
|
|||||||
f.expectCreateDeploymentAction(expDeployment)
|
f.expectCreateDeploymentAction(expDeployment)
|
||||||
f.expectUpdateFooStatusAction(foo)
|
f.expectUpdateFooStatusAction(foo)
|
||||||
|
|
||||||
f.run(ctx, getKey(foo, t))
|
f.run(ctx, getRef(foo, t))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDoNothing(t *testing.T) {
|
func TestDoNothing(t *testing.T) {
|
||||||
@ -277,7 +273,7 @@ func TestDoNothing(t *testing.T) {
|
|||||||
f.kubeobjects = append(f.kubeobjects, d)
|
f.kubeobjects = append(f.kubeobjects, d)
|
||||||
|
|
||||||
f.expectUpdateFooStatusAction(foo)
|
f.expectUpdateFooStatusAction(foo)
|
||||||
f.run(ctx, getKey(foo, t))
|
f.run(ctx, getRef(foo, t))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestUpdateDeployment(t *testing.T) {
|
func TestUpdateDeployment(t *testing.T) {
|
||||||
@ -298,7 +294,7 @@ func TestUpdateDeployment(t *testing.T) {
|
|||||||
|
|
||||||
f.expectUpdateFooStatusAction(foo)
|
f.expectUpdateFooStatusAction(foo)
|
||||||
f.expectUpdateDeploymentAction(expDeployment)
|
f.expectUpdateDeploymentAction(expDeployment)
|
||||||
f.run(ctx, getKey(foo, t))
|
f.run(ctx, getRef(foo, t))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNotControlledByUs(t *testing.T) {
|
func TestNotControlledByUs(t *testing.T) {
|
||||||
@ -315,7 +311,7 @@ func TestNotControlledByUs(t *testing.T) {
|
|||||||
f.deploymentLister = append(f.deploymentLister, d)
|
f.deploymentLister = append(f.deploymentLister, d)
|
||||||
f.kubeobjects = append(f.kubeobjects, d)
|
f.kubeobjects = append(f.kubeobjects, d)
|
||||||
|
|
||||||
f.runExpectError(ctx, getKey(foo, t))
|
f.runExpectError(ctx, getRef(foo, t))
|
||||||
}
|
}
|
||||||
|
|
||||||
func int32Ptr(i int32) *int32 { return &i }
|
func int32Ptr(i int32) *int32 { return &i }
|
||||||
|
Loading…
x
Reference in New Issue
Block a user