Merge pull request #135395 from pohly/apimachinery-wait-for-cache-sync

apimachinery + client-go + device taint eviction unit test: context-aware Start/WaitFor, waiting through channels

Kubernetes-commit: eb09a3c23e3c3905c89e996fcec2c02ba8c4bb0e
This commit is contained in:
Kubernetes Publisher
2026-02-11 06:11:59 +05:30
3 changed files with 80 additions and 29 deletions
+3 -3
View File
@@ -8,10 +8,10 @@ godebug default=go1.25
require (
golang.org/x/time v0.14.0
k8s.io/api v0.0.0-20260210200506-f5f249b4abf6
k8s.io/api v0.0.0-20260210234759-a7e57c53516a
k8s.io/apimachinery v0.0.0-20260210195147-ac0aa3101e5c
k8s.io/client-go v0.0.0-20260210202119-8fcd3c218f43
k8s.io/code-generator v0.0.0-20260210204710-e6283fe0aaba
k8s.io/client-go v0.0.0-20260211085141-82ed9ba53ec3
k8s.io/code-generator v0.0.0-20260211085639-b08e4a5cb980
k8s.io/klog/v2 v2.130.1
k8s.io/kube-openapi v0.0.0-20260127142750-a19766b6e2d4
k8s.io/utils v0.0.0-20260108192941-914a6e750570
+6 -6
View File
@@ -115,14 +115,14 @@ gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
k8s.io/api v0.0.0-20260210200506-f5f249b4abf6 h1:vrS1xkpCN3NEXqTSfNyG20R/YHZfofsyJ6CvQqh2JwA=
k8s.io/api v0.0.0-20260210200506-f5f249b4abf6/go.mod h1:B0NIq8pBNVioUSHt/kOyu7VRGgymqYRM18NNuzch9pA=
k8s.io/api v0.0.0-20260210234759-a7e57c53516a h1:i6v7tCHuRarRfUqNl/XftxKFqDQBUvwJ8PaipCLPXVM=
k8s.io/api v0.0.0-20260210234759-a7e57c53516a/go.mod h1:B0NIq8pBNVioUSHt/kOyu7VRGgymqYRM18NNuzch9pA=
k8s.io/apimachinery v0.0.0-20260210195147-ac0aa3101e5c h1:IV922sFFodkSQRrni7sw7qoqNjDCY7bbHn/xW0tPc6k=
k8s.io/apimachinery v0.0.0-20260210195147-ac0aa3101e5c/go.mod h1:MOYgM5v441TyeB214ckvgCU9rYGX/Mw6Z9ZXIp/F4sU=
k8s.io/client-go v0.0.0-20260210202119-8fcd3c218f43 h1:UxKImWNaX2egRmvUMePqs6cRrKty65O0twPt3QuhRmg=
k8s.io/client-go v0.0.0-20260210202119-8fcd3c218f43/go.mod h1:XHtcerQ6f1y0wX4sO56YEotGD2aPMPif6BvTPPq3FFM=
k8s.io/code-generator v0.0.0-20260210204710-e6283fe0aaba h1:epBegFu4A2ZcikWMU4poGWPLnwOTC/JEpgXxGlRmR4s=
k8s.io/code-generator v0.0.0-20260210204710-e6283fe0aaba/go.mod h1:Fdiz+gDk+6tPjJTPT/j95qPFdtDmpeFo81IzOlvwSfA=
k8s.io/client-go v0.0.0-20260211085141-82ed9ba53ec3 h1:e28KUIYgkYQfzXlM1D6Z2LwQC1Stw8al5dDpckJgFLo=
k8s.io/client-go v0.0.0-20260211085141-82ed9ba53ec3/go.mod h1:f2xHunFU2cSpYz5N+8FYnme8PpwosdY8DVtozbPPsLY=
k8s.io/code-generator v0.0.0-20260211085639-b08e4a5cb980 h1:9UFDpuK/bnEzzmQeOy30vznhWbixuBwmMISvAUzt1V0=
k8s.io/code-generator v0.0.0-20260211085639-b08e4a5cb980/go.mod h1:Fdiz+gDk+6tPjJTPT/j95qPFdtDmpeFo81IzOlvwSfA=
k8s.io/gengo/v2 v2.0.0-20250922181213-ec3ebc5fd46b h1:gMplByicHV/TJBizHd9aVEsTYoJBnnUAT5MHlTkbjhQ=
k8s.io/gengo/v2 v2.0.0-20250922181213-ec3ebc5fd46b/go.mod h1:CgujABENc3KuTrcsdpGmrrASjtQsWCT7R99mEV4U/fM=
k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk=
@@ -19,6 +19,7 @@ limitations under the License.
package externalversions
import (
context "context"
reflect "reflect"
sync "sync"
time "time"
@@ -26,6 +27,7 @@ import (
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
schema "k8s.io/apimachinery/pkg/runtime/schema"
wait "k8s.io/apimachinery/pkg/util/wait"
cache "k8s.io/client-go/tools/cache"
versioned "k8s.io/sample-controller/pkg/generated/clientset/versioned"
internalinterfaces "k8s.io/sample-controller/pkg/generated/informers/externalversions/internalinterfaces"
@@ -139,6 +141,10 @@ func NewSharedInformerFactoryWithOptions(client versioned.Interface, defaultResy
}
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
f.StartWithContext(wait.ContextForChannel(stopCh))
}
func (f *sharedInformerFactory) StartWithContext(ctx context.Context) {
f.lock.Lock()
defer f.lock.Unlock()
@@ -148,15 +154,9 @@ func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
for informerType, informer := range f.informers {
if !f.startedInformers[informerType] {
f.wg.Add(1)
// We need a new variable in each loop iteration,
// otherwise the goroutine would use the loop variable
// and that keeps changing.
informer := informer
go func() {
defer f.wg.Done()
informer.Run(stopCh)
}()
f.wg.Go(func() {
informer.RunWithContext(ctx)
})
f.startedInformers[informerType] = true
}
}
@@ -173,6 +173,11 @@ func (f *sharedInformerFactory) Shutdown() {
}
func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {
result := f.WaitForCacheSyncWithContext(wait.ContextForChannel(stopCh))
return result.Synced
}
func (f *sharedInformerFactory) WaitForCacheSyncWithContext(ctx context.Context) cache.SyncResult {
informers := func() map[reflect.Type]cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
@@ -186,10 +191,31 @@ func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[ref
return informers
}()
res := map[reflect.Type]bool{}
for informType, informer := range informers {
res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)
// Wait for informers to sync, without polling.
cacheSyncs := make([]cache.DoneChecker, 0, len(informers))
for _, informer := range informers {
cacheSyncs = append(cacheSyncs, informer.HasSyncedChecker())
}
cache.WaitFor(ctx, "" /* no logging */, cacheSyncs...)
res := cache.SyncResult{
Synced: make(map[reflect.Type]bool, len(informers)),
}
failed := false
for informType, informer := range informers {
hasSynced := informer.HasSynced()
if !hasSynced {
failed = true
}
res.Synced[informType] = hasSynced
}
if failed {
// context.Cause is more informative than ctx.Err().
// This must be non-nil, otherwise WaitFor wouldn't have stopped
// prematurely.
res.Err = context.Cause(ctx)
}
return res
}
@@ -228,27 +254,46 @@ func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internal
// defer factory.WaitForStop() // Returns immediately if nothing was started.
// genericInformer := factory.ForResource(resource)
// typedInformer := factory.SomeAPIGroup().V1().SomeType()
// factory.Start(ctx.Done()) // Start processing these informers.
// synced := factory.WaitForCacheSync(ctx.Done())
// for v, ok := range synced {
// if !ok {
// fmt.Fprintf(os.Stderr, "caches failed to sync: %v", v)
// return
// }
// handle, err := typeInformer.Informer().AddEventHandler(...)
// if err != nil {
// return fmt.Errorf("register event handler: %v", err)
// }
// defer typeInformer.Informer().RemoveEventHandler(handle) // Avoids leaking goroutines.
// factory.StartWithContext(ctx) // Start processing these informers.
// synced := factory.WaitForCacheSyncWithContext(ctx)
// if err := synced.AsError(); err != nil {
// return err
// }
// for v := range synced {
// // Only if desired log some information similar to this.
// fmt.Fprintf(os.Stdout, "cache synced: %s", v)
// }
//
// // Also make sure that all of the initial cache events have been delivered.
// if !WaitFor(ctx, "event handler sync", handle.HasSyncedChecker()) {
// // Must have failed because of context.
// return fmt.Errorf("sync event handler: %w", context.Cause(ctx))
// }
//
// // Creating informers can also be created after Start, but then
// // Start must be called again:
// anotherGenericInformer := factory.ForResource(resource)
// factory.Start(ctx.Done())
// factory.StartWithContext(ctx)
type SharedInformerFactory interface {
internalinterfaces.SharedInformerFactory
// Start initializes all requested informers. They are handled in goroutines
// which run until the stop channel gets closed.
// Warning: Start does not block. When run in a go-routine, it will race with a later WaitForCacheSync.
//
// Contextual logging: StartWithContext should be used instead of Start in code which supports contextual logging.
Start(stopCh <-chan struct{})
// StartWithContext initializes all requested informers. They are handled in goroutines
// which run until the context gets canceled.
// Warning: StartWithContext does not block. When run in a go-routine, it will race with a later WaitForCacheSync.
StartWithContext(ctx context.Context)
// Shutdown marks a factory as shutting down. At that point no new
// informers can be started anymore and Start will return without
// doing anything.
@@ -263,8 +308,14 @@ type SharedInformerFactory interface {
// WaitForCacheSync blocks until all started informers' caches were synced
// or the stop channel gets closed.
//
// Contextual logging: WaitForCacheSync should be used instead of WaitForCacheSync in code which supports contextual logging. It also returns a more useful result.
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
// WaitForCacheSyncWithContext blocks until all started informers' caches were synced
// or the context gets canceled.
WaitForCacheSyncWithContext(ctx context.Context) cache.SyncResult
// ForResource gives generic access to a shared informer of the matching type.
ForResource(resource schema.GroupVersionResource) (GenericInformer, error)