Merge pull request #58394 from deads2k/controller-08-redeliver

Automatic merge from submit-queue (batch tested with PRs 58412, 56132, 58506, 58542, 58394). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

don't stop informer delivery on error

If an informer delivery fails today, we stop delivering to it entirely.  The pull updates the code to skip that particular notification, delay, and continue delivery with the next time.

/assign derekwaynecarr
/assign ncdc
/assign ash2k

@derekwaynecarr This would change the "the controller isn't doing anything?!" to "the controller missed my (individual) resource!"

```release-note
NONE
```

Kubernetes-commit: 71426ba59fd4a37e5da7deac6298ab33101bb5b6
This commit is contained in:
Kubernetes Publisher
2018-01-22 22:57:47 -08:00
7 changed files with 326 additions and 172 deletions
+12 -2
View File
@@ -18,6 +18,8 @@ package net
import (
"net"
"net/url"
"os"
"reflect"
"syscall"
)
@@ -38,8 +40,16 @@ func IPNetEqual(ipnet1, ipnet2 *net.IPNet) bool {
// Returns if the given err is "connection reset by peer" error.
func IsConnectionReset(err error) bool {
opErr, ok := err.(*net.OpError)
if ok && opErr.Err.Error() == syscall.ECONNRESET.Error() {
if urlErr, ok := err.(*url.Error); ok {
err = urlErr.Err
}
if opErr, ok := err.(*net.OpError); ok {
err = opErr.Err
}
if osErr, ok := err.(*os.SyscallError); ok {
err = osErr.Err
}
if errno, ok := err.(syscall.Errno); ok && errno == syscall.ECONNRESET {
return true
}
return false
+1
View File
@@ -83,6 +83,7 @@ go_library(
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/tools/pager:go_default_library",
"//vendor/k8s.io/client-go/util/buffer:go_default_library",
"//vendor/k8s.io/client-go/util/retry:go_default_library",
],
)
+28 -12
View File
@@ -26,6 +26,7 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/buffer"
"k8s.io/client-go/util/retry"
"github.com/golang/glog"
)
@@ -540,20 +541,35 @@ func (p *processorListener) pop() {
}
func (p *processorListener) run() {
defer utilruntime.HandleCrash()
// this call blocks until the channel is closed. When a panic happens during the notification
// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
// the next notification will be attempted. This is usually better than the alternative of never
// delivering again.
stopCh := make(chan struct{})
wait.Until(func() {
// this gives us a few quick retries before a long pause and then a few more quick retries
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
}
}
// the only way to get here is if the p.nextCh is empty and closed
return true, nil
})
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
// the only way to get here is if the p.nextCh is empty and closed
if err == nil {
close(stopCh)
}
}
}, 1*time.Minute, stopCh)
}
// shouldResync deterimines if the listener needs a resync. If the listener's resyncPeriod is 0,
+42
View File
@@ -0,0 +1,42 @@
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_library(
name = "go_default_library",
srcs = ["util.go"],
importpath = "k8s.io/client-go/util/retry",
deps = [
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["util_test.go"],
embed = [":go_default_library"],
importpath = "k8s.io/client-go/util/retry",
deps = [
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)
Generated Vendored Executable
+2
View File
@@ -0,0 +1,2 @@
reviewers:
- caesarxuchao
+79
View File
@@ -0,0 +1,79 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package retry
import (
"time"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/wait"
)
// DefaultRetry is the recommended retry for a conflict where multiple clients
// are making changes to the same resource.
var DefaultRetry = wait.Backoff{
Steps: 5,
Duration: 10 * time.Millisecond,
Factor: 1.0,
Jitter: 0.1,
}
// DefaultBackoff is the recommended backoff for a conflict where a client
// may be attempting to make an unrelated modification to a resource under
// active management by one or more controllers.
var DefaultBackoff = wait.Backoff{
Steps: 4,
Duration: 10 * time.Millisecond,
Factor: 5.0,
Jitter: 0.1,
}
// RetryConflict executes the provided function repeatedly, retrying if the server returns a conflicting
// write. Callers should preserve previous executions if they wish to retry changes. It performs an
// exponential backoff.
//
// var pod *api.Pod
// err := RetryOnConflict(DefaultBackoff, func() (err error) {
// pod, err = c.Pods("mynamespace").UpdateStatus(podStatus)
// return
// })
// if err != nil {
// // may be conflict if max retries were hit
// return err
// }
// ...
//
// TODO: Make Backoff an interface?
func RetryOnConflict(backoff wait.Backoff, fn func() error) error {
var lastConflictErr error
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
err := fn()
switch {
case err == nil:
return true, nil
case errors.IsConflict(err):
lastConflictErr = err
return false, nil
default:
return false, err
}
})
if err == wait.ErrWaitTimeout {
err = lastConflictErr
}
return err
}