diff --git a/controller.go b/controller.go index efbd391e..75261b74 100644 --- a/controller.go +++ b/controller.go @@ -61,6 +61,8 @@ const ( // MessageResourceSynced is the message used for an Event fired when a Foo // is synced successfully MessageResourceSynced = "Foo synced successfully" + // FieldManager distinguishes this controller from other things writing to API objects + FieldManager = controllerAgentName ) // Controller is the controller implementation for Foo resources @@ -80,7 +82,7 @@ type Controller struct { // means we can ensure we only process a fixed amount of resources at a // time, and makes it easy to ensure we are never processing the same item // simultaneously in two different workers. - workqueue workqueue.TypedRateLimitingInterface[string] + workqueue workqueue.TypedRateLimitingInterface[cache.ObjectName] // recorder is an event recorder for recording Event resources to the // Kubernetes API. recorder record.EventRecorder @@ -106,8 +108,8 @@ func NewController( eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")}) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) ratelimiter := workqueue.NewTypedMaxOfRateLimiter( - workqueue.NewTypedItemExponentialFailureRateLimiter[string](5*time.Millisecond, 1000*time.Second), - &workqueue.TypedBucketRateLimiter[string]{Limiter: rate.NewLimiter(rate.Limit(50), 300)}, + workqueue.NewTypedItemExponentialFailureRateLimiter[cache.ObjectName](5*time.Millisecond, 1000*time.Second), + &workqueue.TypedBucketRateLimiter[cache.ObjectName]{Limiter: rate.NewLimiter(rate.Limit(50), 300)}, ) controller := &Controller{ @@ -196,64 +198,56 @@ func (c *Controller) runWorker(ctx context.Context) { // processNextWorkItem will read a single work item off the workqueue and // attempt to process it, by calling the syncHandler. func (c *Controller) processNextWorkItem(ctx context.Context) bool { - obj, shutdown := c.workqueue.Get() + objRef, shutdown := c.workqueue.Get() logger := klog.FromContext(ctx) if shutdown { return false } - // We wrap this block in a func so we can defer c.workqueue.Done. - err := func() error { - // We call Done here so the workqueue knows we have finished - // processing this item. We also must remember to call Forget if we - // do not want this work item being re-queued. For example, we do - // not call Forget if a transient error occurs, instead the item is - // put back on the workqueue and attempted again after a back-off - // period. - defer c.workqueue.Done(obj) - // Run the syncHandler, passing it the namespace/name string of the - // Foo resource to be synced. - if err := c.syncHandler(ctx, obj); err != nil { - // Put the item back on the workqueue to handle any transient errors. - c.workqueue.AddRateLimited(obj) - return fmt.Errorf("error syncing '%s': %s, requeuing", obj, err.Error()) - } - // Finally, if no error occurs we Forget this item so it does not - // get queued again until another change happens. - c.workqueue.Forget(obj) - logger.Info("Successfully synced", "resourceName", obj) - return nil - }() + // We call Done at the end of this func so the workqueue knows we have + // finished processing this item. We also must remember to call Forget + // if we do not want this work item being re-queued. For example, we do + // not call Forget if a transient error occurs, instead the item is + // put back on the workqueue and attempted again after a back-off + // period. + defer c.workqueue.Done(objRef) - if err != nil { - utilruntime.HandleError(err) + // Run the syncHandler, passing it the structured reference to the object to be synced. + err := c.syncHandler(ctx, objRef) + if err == nil { + // If no error occurs then we Forget this item so it does not + // get queued again until another change happens. + c.workqueue.Forget(objRef) + logger.Info("Successfully synced", "objectName", objRef) return true } - + // there was a failure so be sure to report it. This method allows for + // pluggable error handling which can be used for things like + // cluster-monitoring. + utilruntime.HandleErrorWithContext(ctx, err, "Error syncing; requeuing for later retry", "objectReference", objRef) + // since we failed, we should requeue the item to work on later. This + // method will add a backoff to avoid hotlooping on particular items + // (they're probably still not going to work right away) and overall + // controller protection (everything I've done is broken, this controller + // needs to calm down or it can starve other useful work) cases. + c.workqueue.AddRateLimited(objRef) return true } // syncHandler compares the actual state with the desired, and attempts to // converge the two. It then updates the Status block of the Foo resource // with the current status of the resource. -func (c *Controller) syncHandler(ctx context.Context, key string) error { - // Convert the namespace/name string into a distinct namespace and name - logger := klog.LoggerWithValues(klog.FromContext(ctx), "resourceName", key) - - namespace, name, err := cache.SplitMetaNamespaceKey(key) - if err != nil { - utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key)) - return nil - } +func (c *Controller) syncHandler(ctx context.Context, objectRef cache.ObjectName) error { + logger := klog.LoggerWithValues(klog.FromContext(ctx), "objectRef", objectRef) // Get the Foo resource with this namespace/name - foo, err := c.foosLister.Foos(namespace).Get(name) + foo, err := c.foosLister.Foos(objectRef.Namespace).Get(objectRef.Name) if err != nil { // The Foo resource may no longer exist, in which case we stop // processing. if errors.IsNotFound(err) { - utilruntime.HandleError(fmt.Errorf("foo '%s' in work queue no longer exists", key)) + utilruntime.HandleErrorWithContext(ctx, err, "Foo referenced by item in work queue no longer exists", "objectReference", objectRef) return nil } @@ -265,7 +259,7 @@ func (c *Controller) syncHandler(ctx context.Context, key string) error { // We choose to absorb the error here as the worker would requeue the // resource otherwise. Instead, the next time the resource is updated // the resource will be queued again. - utilruntime.HandleError(fmt.Errorf("%s: deployment name must be specified", key)) + utilruntime.HandleErrorWithContext(ctx, nil, "Deployment name missing from object reference", "objectReference", objectRef) return nil } @@ -273,7 +267,7 @@ func (c *Controller) syncHandler(ctx context.Context, key string) error { deployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName) // If the resource doesn't exist, we'll create it if errors.IsNotFound(err) { - deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(context.TODO(), newDeployment(foo), metav1.CreateOptions{}) + deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(context.TODO(), newDeployment(foo), metav1.CreateOptions{FieldManager: FieldManager}) } // If an error occurs during Get/Create, we'll requeue the item so we can @@ -296,7 +290,7 @@ func (c *Controller) syncHandler(ctx context.Context, key string) error { // should update the Deployment resource. if foo.Spec.Replicas != nil && *foo.Spec.Replicas != *deployment.Spec.Replicas { logger.V(4).Info("Update deployment resource", "currentReplicas", *foo.Spec.Replicas, "desiredReplicas", *deployment.Spec.Replicas) - deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(context.TODO(), newDeployment(foo), metav1.UpdateOptions{}) + deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(context.TODO(), newDeployment(foo), metav1.UpdateOptions{FieldManager: FieldManager}) } // If an error occurs during Update, we'll requeue the item so we can @@ -327,7 +321,7 @@ func (c *Controller) updateFooStatus(foo *samplev1alpha1.Foo, deployment *appsv1 // we must use Update instead of UpdateStatus to update the Status block of the Foo resource. // UpdateStatus will not allow changes to the Spec of the resource, // which is ideal for ensuring nothing other than resource status has been updated. - _, err := c.sampleclientset.SamplecontrollerV1alpha1().Foos(foo.Namespace).UpdateStatus(context.TODO(), fooCopy, metav1.UpdateOptions{}) + _, err := c.sampleclientset.SamplecontrollerV1alpha1().Foos(foo.Namespace).UpdateStatus(context.TODO(), fooCopy, metav1.UpdateOptions{FieldManager: FieldManager}) return err } @@ -335,13 +329,12 @@ func (c *Controller) updateFooStatus(foo *samplev1alpha1.Foo, deployment *appsv1 // string which is then put onto the work queue. This method should *not* be // passed resources of any type other than Foo. func (c *Controller) enqueueFoo(obj interface{}) { - var key string - var err error - if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { + if objectRef, err := cache.ObjectToName(obj); err != nil { utilruntime.HandleError(err) return + } else { + c.workqueue.Add(objectRef) } - c.workqueue.Add(key) } // handleObject will take any resource implementing metav1.Object and attempt @@ -356,12 +349,16 @@ func (c *Controller) handleObject(obj interface{}) { if object, ok = obj.(metav1.Object); !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { - utilruntime.HandleError(fmt.Errorf("error decoding object, invalid type")) + // If the object value is not too big and does not contain sensitive information then + // it may be useful to include it. + utilruntime.HandleErrorWithContext(context.Background(), nil, "Error decoding object, invalid type", "type", fmt.Sprintf("%T", obj)) return } object, ok = tombstone.Obj.(metav1.Object) if !ok { - utilruntime.HandleError(fmt.Errorf("error decoding object tombstone, invalid type")) + // If the object value is not too big and does not contain sensitive information then + // it may be useful to include it. + utilruntime.HandleErrorWithContext(context.Background(), nil, "Error decoding object tombstone, invalid type", "type", fmt.Sprintf("%T", tombstone.Obj)) return } logger.V(4).Info("Recovered deleted object", "resourceName", object.GetName()) diff --git a/controller_test.go b/controller_test.go index 102094dc..191cff1a 100644 --- a/controller_test.go +++ b/controller_test.go @@ -108,22 +108,22 @@ func (f *fixture) newController(ctx context.Context) (*Controller, informers.Sha return c, i, k8sI } -func (f *fixture) run(ctx context.Context, fooName string) { - f.runController(ctx, fooName, true, false) +func (f *fixture) run(ctx context.Context, fooRef cache.ObjectName) { + f.runController(ctx, fooRef, true, false) } -func (f *fixture) runExpectError(ctx context.Context, fooName string) { - f.runController(ctx, fooName, true, true) +func (f *fixture) runExpectError(ctx context.Context, fooRef cache.ObjectName) { + f.runController(ctx, fooRef, true, true) } -func (f *fixture) runController(ctx context.Context, fooName string, startInformers bool, expectError bool) { +func (f *fixture) runController(ctx context.Context, fooRef cache.ObjectName, startInformers bool, expectError bool) { c, i, k8sI := f.newController(ctx) if startInformers { i.Start(ctx.Done()) k8sI.Start(ctx.Done()) } - err := c.syncHandler(ctx, fooName) + err := c.syncHandler(ctx, fooRef) if !expectError && err != nil { f.t.Errorf("error syncing foo: %v", err) } else if expectError && err == nil { @@ -240,13 +240,9 @@ func (f *fixture) expectUpdateFooStatusAction(foo *samplecontroller.Foo) { f.actions = append(f.actions, action) } -func getKey(foo *samplecontroller.Foo, t *testing.T) string { - key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(foo) - if err != nil { - t.Errorf("Unexpected error getting key for foo %v: %v", foo.Name, err) - return "" - } - return key +func getRef(foo *samplecontroller.Foo, t *testing.T) cache.ObjectName { + ref := cache.MetaObjectToName(foo) + return ref } func TestCreatesDeployment(t *testing.T) { @@ -261,7 +257,7 @@ func TestCreatesDeployment(t *testing.T) { f.expectCreateDeploymentAction(expDeployment) f.expectUpdateFooStatusAction(foo) - f.run(ctx, getKey(foo, t)) + f.run(ctx, getRef(foo, t)) } func TestDoNothing(t *testing.T) { @@ -277,7 +273,7 @@ func TestDoNothing(t *testing.T) { f.kubeobjects = append(f.kubeobjects, d) f.expectUpdateFooStatusAction(foo) - f.run(ctx, getKey(foo, t)) + f.run(ctx, getRef(foo, t)) } func TestUpdateDeployment(t *testing.T) { @@ -298,7 +294,7 @@ func TestUpdateDeployment(t *testing.T) { f.expectUpdateFooStatusAction(foo) f.expectUpdateDeploymentAction(expDeployment) - f.run(ctx, getKey(foo, t)) + f.run(ctx, getRef(foo, t)) } func TestNotControlledByUs(t *testing.T) { @@ -315,7 +311,7 @@ func TestNotControlledByUs(t *testing.T) { f.deploymentLister = append(f.deploymentLister, d) f.kubeobjects = append(f.kubeobjects, d) - f.runExpectError(ctx, getKey(foo, t)) + f.runExpectError(ctx, getRef(foo, t)) } func int32Ptr(i int32) *int32 { return &i } diff --git a/go.mod b/go.mod index 4f59c04c..66404069 100644 --- a/go.mod +++ b/go.mod @@ -6,10 +6,10 @@ go 1.22.0 require ( golang.org/x/time v0.3.0 - k8s.io/api v0.0.0-20240628062210-70c01741beda - k8s.io/apimachinery v0.0.0-20240628061934-adf72dd6c5c2 - k8s.io/client-go v0.0.0-20240628062603-ae071bc75ff9 - k8s.io/code-generator v0.0.0-20240628063342-ab86cd677d29 + k8s.io/api v0.0.0-20240630182222-e7b4471d3970 + k8s.io/apimachinery v0.0.0-20240706120253-f813d2809226 + k8s.io/client-go v0.0.0-20240630182625-ab86e03da476 + k8s.io/code-generator v0.0.0-20240629022749-633962a2fc25 k8s.io/klog/v2 v2.130.1 ) diff --git a/go.sum b/go.sum index a1a1fd8b..030b820b 100644 --- a/go.sum +++ b/go.sum @@ -146,14 +146,14 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -k8s.io/api v0.0.0-20240628062210-70c01741beda h1:wQ8ASuAScWceD6yT8709GBxfyv/qevE66b3Rsi7ZJJg= -k8s.io/api v0.0.0-20240628062210-70c01741beda/go.mod h1:Nlysps0zLruvKc0cfR9dtXQd95f5Y2XOtNtA52gutiw= -k8s.io/apimachinery v0.0.0-20240628061934-adf72dd6c5c2 h1:Y2rD1bgpNFe3HJgKqSMzkHcOlafP3jcQLfLqADLHu3g= -k8s.io/apimachinery v0.0.0-20240628061934-adf72dd6c5c2/go.mod h1:HaB7jl7MnnH0C8g+t13Fw226p3U88ZDog/Dt8pQRZUI= -k8s.io/client-go v0.0.0-20240628062603-ae071bc75ff9 h1:O0BS42GzkKUruh8bq75c6mrkg5Ee7rsazpZdwsNToSU= -k8s.io/client-go v0.0.0-20240628062603-ae071bc75ff9/go.mod h1:xdnfcLQaxsfDggWlUrix2Dps0Z9BFoIQyjtSLVk3n/s= -k8s.io/code-generator v0.0.0-20240628063342-ab86cd677d29 h1:a06GYY32cjnLpFlUP/uxHAHhwqCEEe5wAqUV1fmyeJ4= -k8s.io/code-generator v0.0.0-20240628063342-ab86cd677d29/go.mod h1:JOwLjDkOXIew/hfDbC+yThU169aSl8aKtLzsbx1H/AQ= +k8s.io/api v0.0.0-20240630182222-e7b4471d3970 h1:rml67zMN0uvq6wSHy0DeI6jGjOm/saaA8kIKY1kIOmY= +k8s.io/api v0.0.0-20240630182222-e7b4471d3970/go.mod h1:4FMNrXJxgv377lr0HMOJOqwqiNREPvoveZYRveA//xU= +k8s.io/apimachinery v0.0.0-20240706120253-f813d2809226 h1:pN3A5pOLKKsMIiqH2q+vgPCDY7UfhGFiQ5hlUwRM4A8= +k8s.io/apimachinery v0.0.0-20240706120253-f813d2809226/go.mod h1:HaB7jl7MnnH0C8g+t13Fw226p3U88ZDog/Dt8pQRZUI= +k8s.io/client-go v0.0.0-20240630182625-ab86e03da476 h1:bQXvOf4YaEdCr1ae9T59T1phBzjRg9RFwfPUBADbWRA= +k8s.io/client-go v0.0.0-20240630182625-ab86e03da476/go.mod h1:yb2+w7QWMmJdmBX95WUisVco4gwzroT3QTg/G8QdpAc= +k8s.io/code-generator v0.0.0-20240629022749-633962a2fc25 h1:Sa57oBAPw1r0Z/HP6UbnsQlr0cG8Y6R0Q7nfzqy8YXA= +k8s.io/code-generator v0.0.0-20240629022749-633962a2fc25/go.mod h1:lwIBj5yAwOBoRznnhJNmii0jSwM62VyQ/jYmSOgh7dM= k8s.io/gengo/v2 v2.0.0-20240228010128-51d4e06bde70 h1:NGrVE502P0s0/1hudf8zjgwki1X/TByhmAoILTarmzo= k8s.io/gengo/v2 v2.0.0-20240228010128-51d4e06bde70/go.mod h1:VH3AT8AaQOqiGjMF9p0/IM1Dj+82ZwjfxUP1IxaHE+8= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk=