client-go informers: context-aware Start + WaitForCacheSync

Passing a context to StartWithContext enables context-aware reflector
logging. This is the main remaining source of log spam (output to stderr
instead of per-test logger) in controller unit tests.

WaitForCacheSynceWithContext takes advantage of the new cache.WaitFor +
NamedHasSynced functionality to finish "immediately" (= no virtual time
passed) in a synctest bubble. While at it, the return type gets improved so
that a failure is easier to handle.

Kubernetes-commit: 5ff323de791df88880f6e065f5de4b445e5c90ed
This commit is contained in:
Patrick Ohly
2025-11-21 16:23:44 +01:00
committed by Kubernetes Publisher
parent b5e36fedcd
commit 6c1a55459d

View File

@@ -19,6 +19,7 @@ limitations under the License.
package externalversions
import (
context "context"
reflect "reflect"
sync "sync"
time "time"
@@ -26,6 +27,7 @@ import (
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
schema "k8s.io/apimachinery/pkg/runtime/schema"
wait "k8s.io/apimachinery/pkg/util/wait"
cache "k8s.io/client-go/tools/cache"
versioned "k8s.io/sample-controller/pkg/generated/clientset/versioned"
internalinterfaces "k8s.io/sample-controller/pkg/generated/informers/externalversions/internalinterfaces"
@@ -139,6 +141,10 @@ func NewSharedInformerFactoryWithOptions(client versioned.Interface, defaultResy
}
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
f.StartWithContext(wait.ContextForChannel(stopCh))
}
func (f *sharedInformerFactory) StartWithContext(ctx context.Context) {
f.lock.Lock()
defer f.lock.Unlock()
@@ -148,15 +154,9 @@ func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
for informerType, informer := range f.informers {
if !f.startedInformers[informerType] {
f.wg.Add(1)
// We need a new variable in each loop iteration,
// otherwise the goroutine would use the loop variable
// and that keeps changing.
informer := informer
go func() {
defer f.wg.Done()
informer.Run(stopCh)
}()
f.wg.Go(func() {
informer.RunWithContext(ctx)
})
f.startedInformers[informerType] = true
}
}
@@ -173,6 +173,11 @@ func (f *sharedInformerFactory) Shutdown() {
}
func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {
result := f.WaitForCacheSyncWithContext(wait.ContextForChannel(stopCh))
return result.Synced
}
func (f *sharedInformerFactory) WaitForCacheSyncWithContext(ctx context.Context) cache.SyncResult {
informers := func() map[reflect.Type]cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
@@ -186,10 +191,31 @@ func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[ref
return informers
}()
res := map[reflect.Type]bool{}
for informType, informer := range informers {
res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)
// Wait for informers to sync, without polling.
cacheSyncs := make([]cache.DoneChecker, 0, len(informers))
for _, informer := range informers {
cacheSyncs = append(cacheSyncs, informer.HasSyncedChecker())
}
cache.WaitFor(ctx, "" /* no logging */, cacheSyncs...)
res := cache.SyncResult{
Synced: make(map[reflect.Type]bool, len(informers)),
}
failed := false
for informType, informer := range informers {
hasSynced := informer.HasSynced()
if !hasSynced {
failed = true
}
res.Synced[informType] = hasSynced
}
if failed {
// context.Cause is more informative than ctx.Err().
// This must be non-nil, otherwise WaitFor wouldn't have stopped
// prematurely.
res.Err = context.Cause(ctx)
}
return res
}
@@ -228,27 +254,46 @@ func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internal
// defer factory.WaitForStop() // Returns immediately if nothing was started.
// genericInformer := factory.ForResource(resource)
// typedInformer := factory.SomeAPIGroup().V1().SomeType()
// factory.Start(ctx.Done()) // Start processing these informers.
// synced := factory.WaitForCacheSync(ctx.Done())
// for v, ok := range synced {
// if !ok {
// fmt.Fprintf(os.Stderr, "caches failed to sync: %v", v)
// return
// }
// handle, err := typeInformer.Informer().AddEventHandler(...)
// if err != nil {
// return fmt.Errorf("register event handler: %v", err)
// }
// defer typeInformer.Informer().RemoveEventHandler(handle) // Avoids leaking goroutines.
// factory.StartWithContext(ctx) // Start processing these informers.
// synced := factory.WaitForCacheSyncWithContext(ctx)
// if err := synced.AsError(); err != nil {
// return err
// }
// for v := range synced {
// // Only if desired log some information similar to this.
// fmt.Fprintf(os.Stdout, "cache synced: %s", v)
// }
//
// // Also make sure that all of the initial cache events have been delivered.
// if !WaitFor(ctx, "event handler sync", handle.HasSyncedChecker()) {
// // Must have failed because of context.
// return fmt.Errorf("sync event handler: %w", context.Cause(ctx))
// }
//
// // Creating informers can also be created after Start, but then
// // Start must be called again:
// anotherGenericInformer := factory.ForResource(resource)
// factory.Start(ctx.Done())
// factory.StartWithContext(ctx)
type SharedInformerFactory interface {
internalinterfaces.SharedInformerFactory
// Start initializes all requested informers. They are handled in goroutines
// which run until the stop channel gets closed.
// Warning: Start does not block. When run in a go-routine, it will race with a later WaitForCacheSync.
//
// Contextual logging: StartWithContext should be used instead of Start in code which supports contextual logging.
Start(stopCh <-chan struct{})
// StartWithContext initializes all requested informers. They are handled in goroutines
// which run until the context gets canceled.
// Warning: StartWithContext does not block. When run in a go-routine, it will race with a later WaitForCacheSync.
StartWithContext(ctx context.Context)
// Shutdown marks a factory as shutting down. At that point no new
// informers can be started anymore and Start will return without
// doing anything.
@@ -263,8 +308,14 @@ type SharedInformerFactory interface {
// WaitForCacheSync blocks until all started informers' caches were synced
// or the stop channel gets closed.
//
// Contextual logging: WaitForCacheSync should be used instead of WaitForCacheSync in code which supports contextual logging. It also returns a more useful result.
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
// WaitForCacheSyncWithContext blocks until all started informers' caches were synced
// or the context gets canceled.
WaitForCacheSyncWithContext(ctx context.Context) cache.SyncResult
// ForResource gives generic access to a shared informer of the matching type.
ForResource(resource schema.GroupVersionResource) (GenericInformer, error)