From 3272c300d37b73910a2e2779dbe86a084e7b65ea Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Sun, 28 Apr 2024 18:26:18 +0200 Subject: [PATCH] Use the generic/typed workqueue throughout This change makes us use the generic workqueue throughout the project in order to improve type safety and readability of the code. Kubernetes-commit: 6d0ac8c561a7ac66c21e4ee7bd1976c2ecedbf32 --- controller.go | 29 +++++++---------------------- 1 file changed, 7 insertions(+), 22 deletions(-) diff --git a/controller.go b/controller.go index 3c7936ae..f2e66c30 100644 --- a/controller.go +++ b/controller.go @@ -80,7 +80,7 @@ type Controller struct { // means we can ensure we only process a fixed amount of resources at a // time, and makes it easy to ensure we are never processing the same item // simultaneously in two different workers. - workqueue workqueue.RateLimitingInterface + workqueue workqueue.TypedRateLimitingInterface[string] // recorder is an event recorder for recording Event resources to the // Kubernetes API. recorder record.EventRecorder @@ -105,9 +105,9 @@ func NewController( eventBroadcaster.StartStructuredLogging(0) eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")}) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) - ratelimiter := workqueue.NewMaxOfRateLimiter( - workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second), - &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(50), 300)}, + ratelimiter := workqueue.NewTypedMaxOfRateLimiter( + workqueue.NewTypedItemExponentialFailureRateLimiter[string](5*time.Millisecond, 1000*time.Second), + &workqueue.TypedBucketRateLimiter[string]{Limiter: rate.NewLimiter(rate.Limit(50), 300)}, ) controller := &Controller{ @@ -117,7 +117,7 @@ func NewController( deploymentsSynced: deploymentInformer.Informer().HasSynced, foosLister: fooInformer.Lister(), foosSynced: fooInformer.Informer().HasSynced, - workqueue: workqueue.NewRateLimitingQueue(ratelimiter), + workqueue: workqueue.NewTypedRateLimitingQueue(ratelimiter), recorder: recorder, } @@ -204,29 +204,14 @@ func (c *Controller) processNextWorkItem(ctx context.Context) bool { } // We wrap this block in a func so we can defer c.workqueue.Done. - err := func(obj interface{}) error { + err := func(key string) error { // We call Done here so the workqueue knows we have finished // processing this item. We also must remember to call Forget if we // do not want this work item being re-queued. For example, we do // not call Forget if a transient error occurs, instead the item is // put back on the workqueue and attempted again after a back-off // period. - defer c.workqueue.Done(obj) - var key string - var ok bool - // We expect strings to come off the workqueue. These are of the - // form namespace/name. We do this as the delayed nature of the - // workqueue means the items in the informer cache may actually be - // more up to date that when the item was initially put onto the - // workqueue. - if key, ok = obj.(string); !ok { - // As the item in the workqueue is actually invalid, we call - // Forget here else we'd go into a loop of attempting to - // process a work item that is invalid. - c.workqueue.Forget(obj) - utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) - return nil - } + defer c.workqueue.Done(key) // Run the syncHandler, passing it the namespace/name string of the // Foo resource to be synced. if err := c.syncHandler(ctx, key); err != nil {