mirror of
https://github.com/kubernetes/sample-controller.git
synced 2025-03-04 00:02:55 +08:00
Merge pull request #66906 from tnozicka/rename-until
Automatic merge from submit-queue (batch tested with PRs 67071, 66906, 66722, 67276, 67039). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. #50102 Task 1: Move apimachinery/pkg/watch.Until into client-go/tools/watch.UntilWithoutRetry **What this PR does / why we need it**: This is a split off from https://github.com/kubernetes/kubernetes/pull/50102 to go in smaller pieces. Moves `apimachinery/pkg/watch.Until` into `client-go/tools/watch.UntilWithoutRetry` and adds context so it is cancelable. **Release note**: ```release-note NONE ``` **Dev release note**: ```dev-release-note `apimachinery/pkg/watch.Until` has been moved to `client-go/tools/watch.UntilWithoutRetry`. While switching please consider using the new `client-go/tools/watch.UntilWithSync` or `client-go/tools/watch.Until`. ``` /cc @smarterclayton @kubernetes/sig-api-machinery-pr-reviews /milestone v1.12 /priority important-soon /kind bug (bug after the main PR which is this split from) Kubernetes-commit: b6f0aed056ab94fef0b6f54e1ca1d66a5fc228b3
This commit is contained in:
commit
3e2e91bf31
470
Godeps/Godeps.json
generated
470
Godeps/Godeps.json
generated
File diff suppressed because it is too large
Load Diff
2
vendor/k8s.io/apimachinery/pkg/runtime/scheme.go
generated
vendored
2
vendor/k8s.io/apimachinery/pkg/runtime/scheme.go
generated
vendored
@ -70,7 +70,7 @@ type Scheme struct {
|
||||
defaulterFuncs map[reflect.Type]func(interface{})
|
||||
|
||||
// converter stores all registered conversion functions. It also has
|
||||
// default coverting behavior.
|
||||
// default converting behavior.
|
||||
converter *conversion.Converter
|
||||
|
||||
// versionPriority is a map of groups to ordered lists of versions for those groups indicating the
|
||||
|
9
vendor/k8s.io/client-go/tools/cache/listwatch.go
generated
vendored
9
vendor/k8s.io/client-go/tools/cache/listwatch.go
generated
vendored
@ -28,6 +28,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/pager"
|
||||
watchtools "k8s.io/client-go/tools/watch"
|
||||
)
|
||||
|
||||
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
|
||||
@ -116,7 +117,7 @@ func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error)
|
||||
// ListWatchUntil checks the provided conditions against the items returned by the list watcher, returning wait.ErrWaitTimeout
|
||||
// if timeout is exceeded without all conditions returning true, or an error if an error occurs.
|
||||
// TODO: check for watch expired error and retry watch from latest point? Same issue exists for Until.
|
||||
func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watch.ConditionFunc) (*watch.Event, error) {
|
||||
func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watchtools.ConditionFunc) (*watch.Event, error) {
|
||||
if len(conditions) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
@ -178,8 +179,10 @@ func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watch
|
||||
return nil, err
|
||||
}
|
||||
|
||||
evt, err := watch.Until(timeout, watchInterface, remainingConditions...)
|
||||
if err == watch.ErrWatchClosed {
|
||||
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout)
|
||||
defer cancel()
|
||||
evt, err := watchtools.UntilWithoutRetry(ctx, watchInterface, remainingConditions...)
|
||||
if err == watchtools.ErrWatchClosed {
|
||||
// present a consistent error interface to callers
|
||||
err = wait.ErrWaitTimeout
|
||||
}
|
||||
|
@ -17,38 +17,39 @@ limitations under the License.
|
||||
package watch
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
)
|
||||
|
||||
// ConditionFunc returns true if the condition has been reached, false if it has not been reached yet,
|
||||
// or an error if the condition cannot be checked and should terminate. In general, it is better to define
|
||||
// level driven conditions over edge driven conditions (pod has ready=true, vs pod modified and ready changed
|
||||
// from false to true).
|
||||
type ConditionFunc func(event Event) (bool, error)
|
||||
type ConditionFunc func(event watch.Event) (bool, error)
|
||||
|
||||
// ErrWatchClosed is returned when the watch channel is closed before timeout in Until.
|
||||
var ErrWatchClosed = errors.New("watch closed before Until timeout")
|
||||
// ErrWatchClosed is returned when the watch channel is closed before timeout in UntilWithoutRetry.
|
||||
var ErrWatchClosed = errors.New("watch closed before UntilWithoutRetry timeout")
|
||||
|
||||
// Until reads items from the watch until each provided condition succeeds, and then returns the last watch
|
||||
// UntilWithoutRetry reads items from the watch until each provided condition succeeds, and then returns the last watch
|
||||
// encountered. The first condition that returns an error terminates the watch (and the event is also returned).
|
||||
// If no event has been received, the returned event will be nil.
|
||||
// Conditions are satisfied sequentially so as to provide a useful primitive for higher level composition.
|
||||
// A zero timeout means to wait forever.
|
||||
func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc) (*Event, error) {
|
||||
// Waits until context deadline or until context is canceled.
|
||||
//
|
||||
// Warning: Unless you have a very specific use case (probably a special Watcher) don't use this function!!!
|
||||
// Warning: This will fail e.g. on API timeouts and/or 'too old resource version' error.
|
||||
// Warning: You are most probably looking for a function *Until* or *UntilWithSync* below,
|
||||
// Warning: solving such issues.
|
||||
// TODO: Consider making this function private to prevent misuse when the other occurrences in our codebase are gone.
|
||||
func UntilWithoutRetry(ctx context.Context, watcher watch.Interface, conditions ...ConditionFunc) (*watch.Event, error) {
|
||||
ch := watcher.ResultChan()
|
||||
defer watcher.Stop()
|
||||
var after <-chan time.Time
|
||||
if timeout > 0 {
|
||||
after = time.After(timeout)
|
||||
} else {
|
||||
ch := make(chan time.Time)
|
||||
defer close(ch)
|
||||
after = ch
|
||||
}
|
||||
var lastEvent *Event
|
||||
var lastEvent *watch.Event
|
||||
for _, condition := range conditions {
|
||||
// check the next condition against the previous event and short circuit waiting for the next watch
|
||||
if lastEvent != nil {
|
||||
@ -69,7 +70,6 @@ func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc
|
||||
}
|
||||
lastEvent = &event
|
||||
|
||||
// TODO: check for watch expired error and retry watch from latest point?
|
||||
done, err := condition(event)
|
||||
if err != nil {
|
||||
return lastEvent, err
|
||||
@ -78,10 +78,25 @@ func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc
|
||||
break ConditionSucceeded
|
||||
}
|
||||
|
||||
case <-after:
|
||||
case <-ctx.Done():
|
||||
return lastEvent, wait.ErrWaitTimeout
|
||||
}
|
||||
}
|
||||
}
|
||||
return lastEvent, nil
|
||||
}
|
||||
|
||||
// ContextWithOptionalTimeout wraps context.WithTimeout and handles infinite timeouts expressed as 0 duration.
|
||||
func ContextWithOptionalTimeout(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) {
|
||||
if timeout < 0 {
|
||||
// This should be handled in validation
|
||||
glog.Errorf("Timeout for context shall not be negative!")
|
||||
timeout = 0
|
||||
}
|
||||
|
||||
if timeout == 0 {
|
||||
return context.WithCancel(parent)
|
||||
}
|
||||
|
||||
return context.WithTimeout(parent, timeout)
|
||||
}
|
Loading…
Reference in New Issue
Block a user