From 6e60f3495c8c890de6dcb82b5b3c580c6bbad541 Mon Sep 17 00:00:00 2001 From: Mike Spreitzer Date: Fri, 2 Feb 2024 14:30:25 -0500 Subject: [PATCH] Brush up the sample controller Signed-off-by: Mike Spreitzer Kubernetes-commit: 7848612de8d0f6e4ac67c180184ee4bed1ac7691 --- controller.go | 89 +++++++++++++++++++++------------------------- controller_test.go | 30 +++++++--------- 2 files changed, 54 insertions(+), 65 deletions(-) diff --git a/controller.go b/controller.go index efbd391e..189a7b59 100644 --- a/controller.go +++ b/controller.go @@ -61,6 +61,8 @@ const ( // MessageResourceSynced is the message used for an Event fired when a Foo // is synced successfully MessageResourceSynced = "Foo synced successfully" + // FieldManager distinguishes this controller from other things writing to API objects + FieldManager = controllerAgentName ) // Controller is the controller implementation for Foo resources @@ -80,7 +82,7 @@ type Controller struct { // means we can ensure we only process a fixed amount of resources at a // time, and makes it easy to ensure we are never processing the same item // simultaneously in two different workers. - workqueue workqueue.TypedRateLimitingInterface[string] + workqueue workqueue.TypedRateLimitingInterface[cache.ObjectName] // recorder is an event recorder for recording Event resources to the // Kubernetes API. recorder record.EventRecorder @@ -106,8 +108,8 @@ func NewController( eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")}) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) ratelimiter := workqueue.NewTypedMaxOfRateLimiter( - workqueue.NewTypedItemExponentialFailureRateLimiter[string](5*time.Millisecond, 1000*time.Second), - &workqueue.TypedBucketRateLimiter[string]{Limiter: rate.NewLimiter(rate.Limit(50), 300)}, + workqueue.NewTypedItemExponentialFailureRateLimiter[cache.ObjectName](5*time.Millisecond, 1000*time.Second), + &workqueue.TypedBucketRateLimiter[cache.ObjectName]{Limiter: rate.NewLimiter(rate.Limit(50), 300)}, ) controller := &Controller{ @@ -196,64 +198,56 @@ func (c *Controller) runWorker(ctx context.Context) { // processNextWorkItem will read a single work item off the workqueue and // attempt to process it, by calling the syncHandler. func (c *Controller) processNextWorkItem(ctx context.Context) bool { - obj, shutdown := c.workqueue.Get() + objRef, shutdown := c.workqueue.Get() logger := klog.FromContext(ctx) if shutdown { return false } - // We wrap this block in a func so we can defer c.workqueue.Done. - err := func() error { - // We call Done here so the workqueue knows we have finished - // processing this item. We also must remember to call Forget if we - // do not want this work item being re-queued. For example, we do - // not call Forget if a transient error occurs, instead the item is - // put back on the workqueue and attempted again after a back-off - // period. - defer c.workqueue.Done(obj) - // Run the syncHandler, passing it the namespace/name string of the - // Foo resource to be synced. - if err := c.syncHandler(ctx, obj); err != nil { - // Put the item back on the workqueue to handle any transient errors. - c.workqueue.AddRateLimited(obj) - return fmt.Errorf("error syncing '%s': %s, requeuing", obj, err.Error()) - } - // Finally, if no error occurs we Forget this item so it does not - // get queued again until another change happens. - c.workqueue.Forget(obj) - logger.Info("Successfully synced", "resourceName", obj) - return nil - }() + // We call Done at the end of this func so the workqueue knows we have + // finished processing this item. We also must remember to call Forget + // if we do not want this work item being re-queued. For example, we do + // not call Forget if a transient error occurs, instead the item is + // put back on the workqueue and attempted again after a back-off + // period. + defer c.workqueue.Done(objRef) - if err != nil { - utilruntime.HandleError(err) + // Run the syncHandler, passing it the structured reference to the object to be synced. + err := c.syncHandler(ctx, objRef) + if err == nil { + // If no error occurs then we Forget this item so it does not + // get queued again until another change happens. + c.workqueue.Forget(objRef) + logger.Info("Successfully synced", "objectName", objRef) return true } - + // there was a failure so be sure to report it. This method allows for + // pluggable error handling which can be used for things like + // cluster-monitoring. + utilruntime.HandleErrorWithContext(ctx, err, "error syncing; requeuing", "objectReference", objRef) + // since we failed, we should requeue the item to work on later. This + // method will add a backoff to avoid hotlooping on particular items + // (they're probably still not going to work right away) and overall + // controller protection (everything I've done is broken, this controller + // needs to calm down or it can starve other useful work) cases. + c.workqueue.AddRateLimited(objRef) return true } // syncHandler compares the actual state with the desired, and attempts to // converge the two. It then updates the Status block of the Foo resource // with the current status of the resource. -func (c *Controller) syncHandler(ctx context.Context, key string) error { - // Convert the namespace/name string into a distinct namespace and name - logger := klog.LoggerWithValues(klog.FromContext(ctx), "resourceName", key) - - namespace, name, err := cache.SplitMetaNamespaceKey(key) - if err != nil { - utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key)) - return nil - } +func (c *Controller) syncHandler(ctx context.Context, objectRef cache.ObjectName) error { + logger := klog.LoggerWithValues(klog.FromContext(ctx), "objectRef", objectRef) // Get the Foo resource with this namespace/name - foo, err := c.foosLister.Foos(namespace).Get(name) + foo, err := c.foosLister.Foos(objectRef.Namespace).Get(objectRef.Name) if err != nil { // The Foo resource may no longer exist, in which case we stop // processing. if errors.IsNotFound(err) { - utilruntime.HandleError(fmt.Errorf("foo '%s' in work queue no longer exists", key)) + utilruntime.HandleError(fmt.Errorf("foo '%#v' in work queue no longer exists", objectRef)) return nil } @@ -265,7 +259,7 @@ func (c *Controller) syncHandler(ctx context.Context, key string) error { // We choose to absorb the error here as the worker would requeue the // resource otherwise. Instead, the next time the resource is updated // the resource will be queued again. - utilruntime.HandleError(fmt.Errorf("%s: deployment name must be specified", key)) + utilruntime.HandleError(fmt.Errorf("%#v: deployment name must be specified", objectRef)) return nil } @@ -273,7 +267,7 @@ func (c *Controller) syncHandler(ctx context.Context, key string) error { deployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName) // If the resource doesn't exist, we'll create it if errors.IsNotFound(err) { - deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(context.TODO(), newDeployment(foo), metav1.CreateOptions{}) + deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(context.TODO(), newDeployment(foo), metav1.CreateOptions{FieldManager: FieldManager}) } // If an error occurs during Get/Create, we'll requeue the item so we can @@ -296,7 +290,7 @@ func (c *Controller) syncHandler(ctx context.Context, key string) error { // should update the Deployment resource. if foo.Spec.Replicas != nil && *foo.Spec.Replicas != *deployment.Spec.Replicas { logger.V(4).Info("Update deployment resource", "currentReplicas", *foo.Spec.Replicas, "desiredReplicas", *deployment.Spec.Replicas) - deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(context.TODO(), newDeployment(foo), metav1.UpdateOptions{}) + deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(context.TODO(), newDeployment(foo), metav1.UpdateOptions{FieldManager: FieldManager}) } // If an error occurs during Update, we'll requeue the item so we can @@ -327,7 +321,7 @@ func (c *Controller) updateFooStatus(foo *samplev1alpha1.Foo, deployment *appsv1 // we must use Update instead of UpdateStatus to update the Status block of the Foo resource. // UpdateStatus will not allow changes to the Spec of the resource, // which is ideal for ensuring nothing other than resource status has been updated. - _, err := c.sampleclientset.SamplecontrollerV1alpha1().Foos(foo.Namespace).UpdateStatus(context.TODO(), fooCopy, metav1.UpdateOptions{}) + _, err := c.sampleclientset.SamplecontrollerV1alpha1().Foos(foo.Namespace).UpdateStatus(context.TODO(), fooCopy, metav1.UpdateOptions{FieldManager: FieldManager}) return err } @@ -335,13 +329,12 @@ func (c *Controller) updateFooStatus(foo *samplev1alpha1.Foo, deployment *appsv1 // string which is then put onto the work queue. This method should *not* be // passed resources of any type other than Foo. func (c *Controller) enqueueFoo(obj interface{}) { - var key string - var err error - if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { + if objectRef, err := cache.ObjectToName(obj); err != nil { utilruntime.HandleError(err) return + } else { + c.workqueue.Add(objectRef) } - c.workqueue.Add(key) } // handleObject will take any resource implementing metav1.Object and attempt diff --git a/controller_test.go b/controller_test.go index 102094dc..191cff1a 100644 --- a/controller_test.go +++ b/controller_test.go @@ -108,22 +108,22 @@ func (f *fixture) newController(ctx context.Context) (*Controller, informers.Sha return c, i, k8sI } -func (f *fixture) run(ctx context.Context, fooName string) { - f.runController(ctx, fooName, true, false) +func (f *fixture) run(ctx context.Context, fooRef cache.ObjectName) { + f.runController(ctx, fooRef, true, false) } -func (f *fixture) runExpectError(ctx context.Context, fooName string) { - f.runController(ctx, fooName, true, true) +func (f *fixture) runExpectError(ctx context.Context, fooRef cache.ObjectName) { + f.runController(ctx, fooRef, true, true) } -func (f *fixture) runController(ctx context.Context, fooName string, startInformers bool, expectError bool) { +func (f *fixture) runController(ctx context.Context, fooRef cache.ObjectName, startInformers bool, expectError bool) { c, i, k8sI := f.newController(ctx) if startInformers { i.Start(ctx.Done()) k8sI.Start(ctx.Done()) } - err := c.syncHandler(ctx, fooName) + err := c.syncHandler(ctx, fooRef) if !expectError && err != nil { f.t.Errorf("error syncing foo: %v", err) } else if expectError && err == nil { @@ -240,13 +240,9 @@ func (f *fixture) expectUpdateFooStatusAction(foo *samplecontroller.Foo) { f.actions = append(f.actions, action) } -func getKey(foo *samplecontroller.Foo, t *testing.T) string { - key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(foo) - if err != nil { - t.Errorf("Unexpected error getting key for foo %v: %v", foo.Name, err) - return "" - } - return key +func getRef(foo *samplecontroller.Foo, t *testing.T) cache.ObjectName { + ref := cache.MetaObjectToName(foo) + return ref } func TestCreatesDeployment(t *testing.T) { @@ -261,7 +257,7 @@ func TestCreatesDeployment(t *testing.T) { f.expectCreateDeploymentAction(expDeployment) f.expectUpdateFooStatusAction(foo) - f.run(ctx, getKey(foo, t)) + f.run(ctx, getRef(foo, t)) } func TestDoNothing(t *testing.T) { @@ -277,7 +273,7 @@ func TestDoNothing(t *testing.T) { f.kubeobjects = append(f.kubeobjects, d) f.expectUpdateFooStatusAction(foo) - f.run(ctx, getKey(foo, t)) + f.run(ctx, getRef(foo, t)) } func TestUpdateDeployment(t *testing.T) { @@ -298,7 +294,7 @@ func TestUpdateDeployment(t *testing.T) { f.expectUpdateFooStatusAction(foo) f.expectUpdateDeploymentAction(expDeployment) - f.run(ctx, getKey(foo, t)) + f.run(ctx, getRef(foo, t)) } func TestNotControlledByUs(t *testing.T) { @@ -315,7 +311,7 @@ func TestNotControlledByUs(t *testing.T) { f.deploymentLister = append(f.deploymentLister, d) f.kubeobjects = append(f.kubeobjects, d) - f.runExpectError(ctx, getKey(foo, t)) + f.runExpectError(ctx, getRef(foo, t)) } func int32Ptr(i int32) *int32 { return &i }