mirror of
https://github.com/kubernetes/sample-controller.git
synced 2025-01-31 17:02:52 +08:00
client-go: support waiting for SharedInformerFactory shutdown
SharedInformerFactory starts goroutines in Start and those can be stopped by closing the stop channel. However, there was no API that waits for the goroutines. This is a problem for unit testing. A test has to return while the informers are still running, which may get flagged by tools like https://github.com/uber-go/goleak or by klog/ktesting when those informers lead to log output. While at it, more documentation gets added to address https://github.com/kubernetes/kubernetes/issues/65036. Kubernetes-commit: e89d1d47e8365cad31600b17dab662d3c8a359dd
This commit is contained in:
parent
17031b84be
commit
17c7f29b31
@ -47,6 +47,11 @@ type sharedInformerFactory struct {
|
||||
// startedInformers is used for tracking which informers have been started.
|
||||
// This allows Start() to be called multiple times safely.
|
||||
startedInformers map[reflect.Type]bool
|
||||
// wg tracks how many goroutines were started.
|
||||
wg sync.WaitGroup
|
||||
// shuttingDown is true when Shutdown has been called. It may still be running
|
||||
// because it needs to wait for goroutines.
|
||||
shuttingDown bool
|
||||
}
|
||||
|
||||
// WithCustomResyncConfig sets a custom resync period for the specified informer types.
|
||||
@ -107,20 +112,39 @@ func NewSharedInformerFactoryWithOptions(client versioned.Interface, defaultResy
|
||||
return factory
|
||||
}
|
||||
|
||||
// Start initializes all requested informers.
|
||||
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
|
||||
if f.shuttingDown {
|
||||
return
|
||||
}
|
||||
|
||||
for informerType, informer := range f.informers {
|
||||
if !f.startedInformers[informerType] {
|
||||
go informer.Run(stopCh)
|
||||
f.wg.Add(1)
|
||||
// We need a new variable in each loop iteration,
|
||||
// otherwise the goroutine would use the loop variable
|
||||
// and that keeps changing.
|
||||
informer := informer
|
||||
go func() {
|
||||
defer f.wg.Done()
|
||||
informer.Run(stopCh)
|
||||
}()
|
||||
f.startedInformers[informerType] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// WaitForCacheSync waits for all started informers' cache were synced.
|
||||
func (f *sharedInformerFactory) Shutdown() {
|
||||
f.lock.Lock()
|
||||
f.shuttingDown = true
|
||||
f.lock.Unlock()
|
||||
|
||||
// Will return immediately if there is nothing to wait for.
|
||||
f.wg.Wait()
|
||||
}
|
||||
|
||||
func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {
|
||||
informers := func() map[reflect.Type]cache.SharedIndexInformer {
|
||||
f.lock.Lock()
|
||||
@ -167,11 +191,58 @@ func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internal
|
||||
|
||||
// SharedInformerFactory provides shared informers for resources in all known
|
||||
// API group versions.
|
||||
//
|
||||
// It is typically used like this:
|
||||
//
|
||||
// ctx, cancel := context.Background()
|
||||
// defer cancel()
|
||||
// factory := NewSharedInformerFactory(client, resyncPeriod)
|
||||
// defer factory.WaitForStop() // Returns immediately if nothing was started.
|
||||
// genericInformer := factory.ForResource(resource)
|
||||
// typedInformer := factory.SomeAPIGroup().V1().SomeType()
|
||||
// factory.Start(ctx.Done()) // Start processing these informers.
|
||||
// synced := factory.WaitForCacheSync(ctx.Done())
|
||||
// for v, ok := range synced {
|
||||
// if !ok {
|
||||
// fmt.Fprintf(os.Stderr, "caches failed to sync: %v", v)
|
||||
// return
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// // Creating informers can also be created after Start, but then
|
||||
// // Start must be called again:
|
||||
// anotherGenericInformer := factory.ForResource(resource)
|
||||
// factory.Start(ctx.Done())
|
||||
type SharedInformerFactory interface {
|
||||
internalinterfaces.SharedInformerFactory
|
||||
ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
|
||||
|
||||
// Start initializes all requested informers. They are handled in goroutines
|
||||
// which run until the stop channel gets closed.
|
||||
Start(stopCh <-chan struct{})
|
||||
|
||||
// Shutdown marks a factory as shutting down. At that point no new
|
||||
// informers can be started anymore and Start will return without
|
||||
// doing anything.
|
||||
//
|
||||
// In addition, Shutdown blocks until all goroutines have terminated. For that
|
||||
// to happen, the close channel(s) that they were started with must be closed,
|
||||
// either before Shutdown gets called or while it is waiting.
|
||||
//
|
||||
// Shutdown may be called multiple times, even concurrently. All such calls will
|
||||
// block until all goroutines have terminated.
|
||||
Shutdown()
|
||||
|
||||
// WaitForCacheSync blocks until all started informers' caches were synced
|
||||
// or the stop channel gets closed.
|
||||
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
|
||||
|
||||
// ForResource gives generic access to a shared informer of the matching type.
|
||||
ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
|
||||
|
||||
// InternalInformerFor returns the SharedIndexInformer for obj using an internal
|
||||
// client.
|
||||
InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer
|
||||
|
||||
Samplecontroller() samplecontroller.Interface
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user