Use the generic/typed workqueue throughout

This change makes us use the generic workqueue throughout the project in
order to improve type safety and readability of the code.

Kubernetes-commit: 6d0ac8c561a7ac66c21e4ee7bd1976c2ecedbf32
This commit is contained in:
Alvaro Aleman 2024-04-28 18:26:18 +02:00 committed by Kubernetes Publisher
parent 77cf3aaeee
commit 3272c300d3

View File

@ -80,7 +80,7 @@ type Controller struct {
// means we can ensure we only process a fixed amount of resources at a // means we can ensure we only process a fixed amount of resources at a
// time, and makes it easy to ensure we are never processing the same item // time, and makes it easy to ensure we are never processing the same item
// simultaneously in two different workers. // simultaneously in two different workers.
workqueue workqueue.RateLimitingInterface workqueue workqueue.TypedRateLimitingInterface[string]
// recorder is an event recorder for recording Event resources to the // recorder is an event recorder for recording Event resources to the
// Kubernetes API. // Kubernetes API.
recorder record.EventRecorder recorder record.EventRecorder
@ -105,9 +105,9 @@ func NewController(
eventBroadcaster.StartStructuredLogging(0) eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")}) eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
ratelimiter := workqueue.NewMaxOfRateLimiter( ratelimiter := workqueue.NewTypedMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second), workqueue.NewTypedItemExponentialFailureRateLimiter[string](5*time.Millisecond, 1000*time.Second),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(50), 300)}, &workqueue.TypedBucketRateLimiter[string]{Limiter: rate.NewLimiter(rate.Limit(50), 300)},
) )
controller := &Controller{ controller := &Controller{
@ -117,7 +117,7 @@ func NewController(
deploymentsSynced: deploymentInformer.Informer().HasSynced, deploymentsSynced: deploymentInformer.Informer().HasSynced,
foosLister: fooInformer.Lister(), foosLister: fooInformer.Lister(),
foosSynced: fooInformer.Informer().HasSynced, foosSynced: fooInformer.Informer().HasSynced,
workqueue: workqueue.NewRateLimitingQueue(ratelimiter), workqueue: workqueue.NewTypedRateLimitingQueue(ratelimiter),
recorder: recorder, recorder: recorder,
} }
@ -204,29 +204,14 @@ func (c *Controller) processNextWorkItem(ctx context.Context) bool {
} }
// We wrap this block in a func so we can defer c.workqueue.Done. // We wrap this block in a func so we can defer c.workqueue.Done.
err := func(obj interface{}) error { err := func(key string) error {
// We call Done here so the workqueue knows we have finished // We call Done here so the workqueue knows we have finished
// processing this item. We also must remember to call Forget if we // processing this item. We also must remember to call Forget if we
// do not want this work item being re-queued. For example, we do // do not want this work item being re-queued. For example, we do
// not call Forget if a transient error occurs, instead the item is // not call Forget if a transient error occurs, instead the item is
// put back on the workqueue and attempted again after a back-off // put back on the workqueue and attempted again after a back-off
// period. // period.
defer c.workqueue.Done(obj) defer c.workqueue.Done(key)
var key string
var ok bool
// We expect strings to come off the workqueue. These are of the
// form namespace/name. We do this as the delayed nature of the
// workqueue means the items in the informer cache may actually be
// more up to date that when the item was initially put onto the
// workqueue.
if key, ok = obj.(string); !ok {
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
c.workqueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
// Run the syncHandler, passing it the namespace/name string of the // Run the syncHandler, passing it the namespace/name string of the
// Foo resource to be synced. // Foo resource to be synced.
if err := c.syncHandler(ctx, key); err != nil { if err := c.syncHandler(ctx, key); err != nil {