2017-10-19 17:32:12 +08:00
/ *
Copyright 2015 The Kubernetes Authors .
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
package cache
import (
"fmt"
"sync"
"time"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/buffer"
2018-01-23 14:57:47 +08:00
"k8s.io/client-go/util/retry"
2017-10-19 17:32:12 +08:00
"github.com/golang/glog"
)
// SharedInformer has a shared data cache and is capable of distributing notifications for changes
// to the cache to multiple listeners who registered via AddEventHandler. If you use this, there is
// one behavior change compared to a standard Informer. When you receive a notification, the cache
// will be AT LEAST as fresh as the notification, but it MAY be more fresh. You should NOT depend
// on the contents of the cache exactly matching the notification you've received in handler
// functions. If there was a create, followed by a delete, the cache may NOT have your item. This
// has advantages over the broadcaster since it allows us to share a common cache across many
// controllers. Extending the broadcaster would have required us keep duplicate caches for each
// watch.
type SharedInformer interface {
// AddEventHandler adds an event handler to the shared informer using the shared informer's resync
// period. Events to a single handler are delivered sequentially, but there is no coordination
// between different handlers.
AddEventHandler ( handler ResourceEventHandler )
// AddEventHandlerWithResyncPeriod adds an event handler to the shared informer using the
// specified resync period. Events to a single handler are delivered sequentially, but there is
// no coordination between different handlers.
AddEventHandlerWithResyncPeriod ( handler ResourceEventHandler , resyncPeriod time . Duration )
// GetStore returns the Store.
GetStore ( ) Store
// GetController gives back a synthetic interface that "votes" to start the informer
GetController ( ) Controller
// Run starts the shared informer, which will be stopped when stopCh is closed.
Run ( stopCh <- chan struct { } )
// HasSynced returns true if the shared informer's store has synced.
HasSynced ( ) bool
// LastSyncResourceVersion is the resource version observed when last synced with the underlying
// store. The value returned is not synchronized with access to the underlying store and is not
// thread-safe.
LastSyncResourceVersion ( ) string
}
type SharedIndexInformer interface {
SharedInformer
// AddIndexers add indexers to the informer before it starts.
AddIndexers ( indexers Indexers ) error
GetIndexer ( ) Indexer
}
// NewSharedInformer creates a new instance for the listwatcher.
func NewSharedInformer ( lw ListerWatcher , objType runtime . Object , resyncPeriod time . Duration ) SharedInformer {
return NewSharedIndexInformer ( lw , objType , resyncPeriod , Indexers { } )
}
// NewSharedIndexInformer creates a new instance for the listwatcher.
func NewSharedIndexInformer ( lw ListerWatcher , objType runtime . Object , defaultEventHandlerResyncPeriod time . Duration , indexers Indexers ) SharedIndexInformer {
realClock := & clock . RealClock { }
sharedIndexInformer := & sharedIndexInformer {
processor : & sharedProcessor { clock : realClock } ,
indexer : NewIndexer ( DeletionHandlingMetaNamespaceKeyFunc , indexers ) ,
listerWatcher : lw ,
objectType : objType ,
resyncCheckPeriod : defaultEventHandlerResyncPeriod ,
defaultEventHandlerResyncPeriod : defaultEventHandlerResyncPeriod ,
cacheMutationDetector : NewCacheMutationDetector ( fmt . Sprintf ( "%T" , objType ) ) ,
clock : realClock ,
}
return sharedIndexInformer
}
// InformerSynced is a function that can be used to determine if an informer has synced. This is useful for determining if caches have synced.
type InformerSynced func ( ) bool
const (
// syncedPollPeriod controls how often you look at the status of your sync funcs
syncedPollPeriod = 100 * time . Millisecond
// initialBufferSize is the initial number of event notifications that can be buffered.
initialBufferSize = 1024
)
// WaitForCacheSync waits for caches to populate. It returns true if it was successful, false
// if the controller should shutdown
func WaitForCacheSync ( stopCh <- chan struct { } , cacheSyncs ... InformerSynced ) bool {
err := wait . PollUntil ( syncedPollPeriod ,
func ( ) ( bool , error ) {
for _ , syncFunc := range cacheSyncs {
if ! syncFunc ( ) {
return false , nil
}
}
return true , nil
} ,
stopCh )
if err != nil {
glog . V ( 2 ) . Infof ( "stop requested" )
return false
}
glog . V ( 4 ) . Infof ( "caches populated" )
return true
}
type sharedIndexInformer struct {
indexer Indexer
controller Controller
processor * sharedProcessor
cacheMutationDetector CacheMutationDetector
// This block is tracked to handle late initialization of the controller
listerWatcher ListerWatcher
objectType runtime . Object
// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
// shouldResync to check if any of our listeners need a resync.
resyncCheckPeriod time . Duration
// defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
// AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
// value).
defaultEventHandlerResyncPeriod time . Duration
// clock allows for testability
clock clock . Clock
started , stopped bool
startedLock sync . Mutex
// blockDeltas gives a way to stop all event distribution so that a late event handler
// can safely join the shared informer.
blockDeltas sync . Mutex
}
// dummyController hides the fact that a SharedInformer is different from a dedicated one
// where a caller can `Run`. The run method is disconnected in this case, because higher
// level logic will decide when to start the SharedInformer and related controller.
// Because returning information back is always asynchronous, the legacy callers shouldn't
// notice any change in behavior.
type dummyController struct {
informer * sharedIndexInformer
}
func ( v * dummyController ) Run ( stopCh <- chan struct { } ) {
}
func ( v * dummyController ) HasSynced ( ) bool {
return v . informer . HasSynced ( )
}
func ( c * dummyController ) LastSyncResourceVersion ( ) string {
return ""
}
type updateNotification struct {
oldObj interface { }
newObj interface { }
}
type addNotification struct {
newObj interface { }
}
type deleteNotification struct {
oldObj interface { }
}
func ( s * sharedIndexInformer ) Run ( stopCh <- chan struct { } ) {
defer utilruntime . HandleCrash ( )
fifo := NewDeltaFIFO ( MetaNamespaceKeyFunc , nil , s . indexer )
cfg := & Config {
Queue : fifo ,
ListerWatcher : s . listerWatcher ,
ObjectType : s . objectType ,
FullResyncPeriod : s . resyncCheckPeriod ,
RetryOnError : false ,
ShouldResync : s . processor . shouldResync ,
Process : s . HandleDeltas ,
}
func ( ) {
s . startedLock . Lock ( )
defer s . startedLock . Unlock ( )
s . controller = New ( cfg )
s . controller . ( * controller ) . clock = s . clock
s . started = true
} ( )
// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make ( chan struct { } )
var wg wait . Group
defer wg . Wait ( ) // Wait for Processor to stop
defer close ( processorStopCh ) // Tell Processor to stop
wg . StartWithChannel ( processorStopCh , s . cacheMutationDetector . Run )
wg . StartWithChannel ( processorStopCh , s . processor . run )
defer func ( ) {
s . startedLock . Lock ( )
defer s . startedLock . Unlock ( )
s . stopped = true // Don't want any new listeners
} ( )
s . controller . Run ( stopCh )
}
func ( s * sharedIndexInformer ) HasSynced ( ) bool {
s . startedLock . Lock ( )
defer s . startedLock . Unlock ( )
if s . controller == nil {
return false
}
return s . controller . HasSynced ( )
}
func ( s * sharedIndexInformer ) LastSyncResourceVersion ( ) string {
s . startedLock . Lock ( )
defer s . startedLock . Unlock ( )
if s . controller == nil {
return ""
}
return s . controller . LastSyncResourceVersion ( )
}
func ( s * sharedIndexInformer ) GetStore ( ) Store {
return s . indexer
}
func ( s * sharedIndexInformer ) GetIndexer ( ) Indexer {
return s . indexer
}
func ( s * sharedIndexInformer ) AddIndexers ( indexers Indexers ) error {
s . startedLock . Lock ( )
defer s . startedLock . Unlock ( )
if s . started {
return fmt . Errorf ( "informer has already started" )
}
return s . indexer . AddIndexers ( indexers )
}
func ( s * sharedIndexInformer ) GetController ( ) Controller {
return & dummyController { informer : s }
}
func ( s * sharedIndexInformer ) AddEventHandler ( handler ResourceEventHandler ) {
s . AddEventHandlerWithResyncPeriod ( handler , s . defaultEventHandlerResyncPeriod )
}
func determineResyncPeriod ( desired , check time . Duration ) time . Duration {
if desired == 0 {
return desired
}
if check == 0 {
glog . Warningf ( "The specified resyncPeriod %v is invalid because this shared informer doesn't support resyncing" , desired )
return 0
}
if desired < check {
glog . Warningf ( "The specified resyncPeriod %v is being increased to the minimum resyncCheckPeriod %v" , desired , check )
return check
}
return desired
}
const minimumResyncPeriod = 1 * time . Second
func ( s * sharedIndexInformer ) AddEventHandlerWithResyncPeriod ( handler ResourceEventHandler , resyncPeriod time . Duration ) {
s . startedLock . Lock ( )
defer s . startedLock . Unlock ( )
if s . stopped {
glog . V ( 2 ) . Infof ( "Handler %v was not added to shared informer because it has stopped already" , handler )
return
}
if resyncPeriod > 0 {
if resyncPeriod < minimumResyncPeriod {
glog . Warningf ( "resyncPeriod %d is too small. Changing it to the minimum allowed value of %d" , resyncPeriod , minimumResyncPeriod )
resyncPeriod = minimumResyncPeriod
}
if resyncPeriod < s . resyncCheckPeriod {
if s . started {
glog . Warningf ( "resyncPeriod %d is smaller than resyncCheckPeriod %d and the informer has already started. Changing it to %d" , resyncPeriod , s . resyncCheckPeriod , s . resyncCheckPeriod )
resyncPeriod = s . resyncCheckPeriod
} else {
// if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update
// resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners
// accordingly
s . resyncCheckPeriod = resyncPeriod
s . processor . resyncCheckPeriodChanged ( resyncPeriod )
}
}
}
listener := newProcessListener ( handler , resyncPeriod , determineResyncPeriod ( resyncPeriod , s . resyncCheckPeriod ) , s . clock . Now ( ) , initialBufferSize )
if ! s . started {
s . processor . addListener ( listener )
return
}
// in order to safely join, we have to
// 1. stop sending add/update/delete notifications
// 2. do a list against the store
// 3. send synthetic "Add" events to the new handler
// 4. unblock
s . blockDeltas . Lock ( )
defer s . blockDeltas . Unlock ( )
s . processor . addAndStartListener ( listener )
for _ , item := range s . indexer . List ( ) {
listener . add ( addNotification { newObj : item } )
}
}
func ( s * sharedIndexInformer ) HandleDeltas ( obj interface { } ) error {
s . blockDeltas . Lock ( )
defer s . blockDeltas . Unlock ( )
// from oldest to newest
for _ , d := range obj . ( Deltas ) {
switch d . Type {
case Sync , Added , Updated :
isSync := d . Type == Sync
s . cacheMutationDetector . AddObject ( d . Object )
if old , exists , err := s . indexer . Get ( d . Object ) ; err == nil && exists {
if err := s . indexer . Update ( d . Object ) ; err != nil {
return err
}
s . processor . distribute ( updateNotification { oldObj : old , newObj : d . Object } , isSync )
} else {
if err := s . indexer . Add ( d . Object ) ; err != nil {
return err
}
s . processor . distribute ( addNotification { newObj : d . Object } , isSync )
}
case Deleted :
if err := s . indexer . Delete ( d . Object ) ; err != nil {
return err
}
s . processor . distribute ( deleteNotification { oldObj : d . Object } , false )
}
}
return nil
}
type sharedProcessor struct {
listenersLock sync . RWMutex
listeners [ ] * processorListener
syncingListeners [ ] * processorListener
clock clock . Clock
wg wait . Group
}
func ( p * sharedProcessor ) addAndStartListener ( listener * processorListener ) {
p . listenersLock . Lock ( )
defer p . listenersLock . Unlock ( )
p . addListenerLocked ( listener )
p . wg . Start ( listener . run )
p . wg . Start ( listener . pop )
}
func ( p * sharedProcessor ) addListener ( listener * processorListener ) {
p . listenersLock . Lock ( )
defer p . listenersLock . Unlock ( )
p . addListenerLocked ( listener )
}
func ( p * sharedProcessor ) addListenerLocked ( listener * processorListener ) {
p . listeners = append ( p . listeners , listener )
p . syncingListeners = append ( p . syncingListeners , listener )
}
func ( p * sharedProcessor ) distribute ( obj interface { } , sync bool ) {
p . listenersLock . RLock ( )
defer p . listenersLock . RUnlock ( )
if sync {
for _ , listener := range p . syncingListeners {
listener . add ( obj )
}
} else {
for _ , listener := range p . listeners {
listener . add ( obj )
}
}
}
func ( p * sharedProcessor ) run ( stopCh <- chan struct { } ) {
func ( ) {
p . listenersLock . RLock ( )
defer p . listenersLock . RUnlock ( )
for _ , listener := range p . listeners {
p . wg . Start ( listener . run )
p . wg . Start ( listener . pop )
}
} ( )
<- stopCh
p . listenersLock . RLock ( )
defer p . listenersLock . RUnlock ( )
for _ , listener := range p . listeners {
close ( listener . addCh ) // Tell .pop() to stop. .pop() will tell .run() to stop
}
p . wg . Wait ( ) // Wait for all .pop() and .run() to stop
}
// shouldResync queries every listener to determine if any of them need a resync, based on each
// listener's resyncPeriod.
func ( p * sharedProcessor ) shouldResync ( ) bool {
p . listenersLock . Lock ( )
defer p . listenersLock . Unlock ( )
p . syncingListeners = [ ] * processorListener { }
resyncNeeded := false
now := p . clock . Now ( )
for _ , listener := range p . listeners {
// need to loop through all the listeners to see if they need to resync so we can prepare any
// listeners that are going to be resyncing.
if listener . shouldResync ( now ) {
resyncNeeded = true
p . syncingListeners = append ( p . syncingListeners , listener )
listener . determineNextResync ( now )
}
}
return resyncNeeded
}
func ( p * sharedProcessor ) resyncCheckPeriodChanged ( resyncCheckPeriod time . Duration ) {
p . listenersLock . RLock ( )
defer p . listenersLock . RUnlock ( )
for _ , listener := range p . listeners {
resyncPeriod := determineResyncPeriod ( listener . requestedResyncPeriod , resyncCheckPeriod )
listener . setResyncPeriod ( resyncPeriod )
}
}
type processorListener struct {
nextCh chan interface { }
addCh chan interface { }
handler ResourceEventHandler
// pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed.
// There is one per listener, but a failing/stalled listener will have infinite pendingNotifications
// added until we OOM.
// TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but
// we should try to do something better.
pendingNotifications buffer . RingGrowing
// requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer
requestedResyncPeriod time . Duration
// resyncPeriod is how frequently the listener wants a full resync from the shared informer. This
// value may differ from requestedResyncPeriod if the shared informer adjusts it to align with the
// informer's overall resync check period.
resyncPeriod time . Duration
// nextResync is the earliest time the listener should get a full resync
nextResync time . Time
// resyncLock guards access to resyncPeriod and nextResync
resyncLock sync . Mutex
}
func newProcessListener ( handler ResourceEventHandler , requestedResyncPeriod , resyncPeriod time . Duration , now time . Time , bufferSize int ) * processorListener {
ret := & processorListener {
nextCh : make ( chan interface { } ) ,
addCh : make ( chan interface { } ) ,
handler : handler ,
pendingNotifications : * buffer . NewRingGrowing ( bufferSize ) ,
requestedResyncPeriod : requestedResyncPeriod ,
resyncPeriod : resyncPeriod ,
}
ret . determineNextResync ( now )
return ret
}
func ( p * processorListener ) add ( notification interface { } ) {
p . addCh <- notification
}
func ( p * processorListener ) pop ( ) {
defer utilruntime . HandleCrash ( )
defer close ( p . nextCh ) // Tell .run() to stop
var nextCh chan <- interface { }
var notification interface { }
for {
select {
case nextCh <- notification :
// Notification dispatched
var ok bool
notification , ok = p . pendingNotifications . ReadOne ( )
if ! ok { // Nothing to pop
nextCh = nil // Disable this select case
}
case notificationToAdd , ok := <- p . addCh :
if ! ok {
return
}
if notification == nil { // No notification to pop (and pendingNotifications is empty)
// Optimize the case - skip adding to pendingNotifications
notification = notificationToAdd
nextCh = p . nextCh
} else { // There is already a notification waiting to be dispatched
p . pendingNotifications . WriteOne ( notificationToAdd )
}
}
}
}
func ( p * processorListener ) run ( ) {
2018-01-23 14:57:47 +08:00
// this call blocks until the channel is closed. When a panic happens during the notification
// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
// the next notification will be attempted. This is usually better than the alternative of never
// delivering again.
stopCh := make ( chan struct { } )
wait . Until ( func ( ) {
// this gives us a few quick retries before a long pause and then a few more quick retries
err := wait . ExponentialBackoff ( retry . DefaultRetry , func ( ) ( bool , error ) {
for next := range p . nextCh {
switch notification := next . ( type ) {
case updateNotification :
p . handler . OnUpdate ( notification . oldObj , notification . newObj )
case addNotification :
p . handler . OnAdd ( notification . newObj )
case deleteNotification :
p . handler . OnDelete ( notification . oldObj )
default :
utilruntime . HandleError ( fmt . Errorf ( "unrecognized notification: %#v" , next ) )
}
}
// the only way to get here is if the p.nextCh is empty and closed
return true , nil
} )
2017-10-19 17:32:12 +08:00
2018-01-23 14:57:47 +08:00
// the only way to get here is if the p.nextCh is empty and closed
if err == nil {
close ( stopCh )
2017-10-19 17:32:12 +08:00
}
2018-01-23 14:57:47 +08:00
} , 1 * time . Minute , stopCh )
2017-10-19 17:32:12 +08:00
}
// shouldResync deterimines if the listener needs a resync. If the listener's resyncPeriod is 0,
// this always returns false.
func ( p * processorListener ) shouldResync ( now time . Time ) bool {
p . resyncLock . Lock ( )
defer p . resyncLock . Unlock ( )
if p . resyncPeriod == 0 {
return false
}
return now . After ( p . nextResync ) || now . Equal ( p . nextResync )
}
func ( p * processorListener ) determineNextResync ( now time . Time ) {
p . resyncLock . Lock ( )
defer p . resyncLock . Unlock ( )
p . nextResync = now . Add ( p . resyncPeriod )
}
func ( p * processorListener ) setResyncPeriod ( resyncPeriod time . Duration ) {
p . resyncLock . Lock ( )
defer p . resyncLock . Unlock ( )
p . resyncPeriod = resyncPeriod
}