sample-controller/vendor/github.com/modern-go/concurrent/unbounded_executor.go
Kubernetes Publisher c0feae0701 Merge pull request #63059 from ceshihao/upgrade_json_package_fix_base64_newline
Automatic merge from submit-queue (batch tested with PRs 59965, 59115, 63076, 63059). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Upgrade dep json-iterator/go to fix base64 decode bug

**What this PR does / why we need it**:
upgrade dep `json-iterator/go` to fix base64 decode bug #62742

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes #62742

**Special notes for your reviewer**:
Just upgrade `json-iterator/go` to latest which includes base64 decode fix https://github.com/json-iterator/go/pull/266
No other code changes

**Release note**:

```release-note
None
```

Kubernetes-commit: 3dbcd1ddcee786f443f89a82514bbd9c6ad06c99
2018-04-26 07:24:17 +00:00

120 lines
3.6 KiB
Go

package concurrent
import (
"context"
"fmt"
"runtime"
"runtime/debug"
"sync"
"time"
"reflect"
)
// HandlePanic logs goroutine panic by default
var HandlePanic = func(recovered interface{}, funcName string) {
ErrorLogger.Println(fmt.Sprintf("%s panic: %v", funcName, recovered))
ErrorLogger.Println(string(debug.Stack()))
}
// UnboundedExecutor is a executor without limits on counts of alive goroutines
// it tracks the goroutine started by it, and can cancel them when shutdown
type UnboundedExecutor struct {
ctx context.Context
cancel context.CancelFunc
activeGoroutinesMutex *sync.Mutex
activeGoroutines map[string]int
HandlePanic func(recovered interface{}, funcName string)
}
// GlobalUnboundedExecutor has the life cycle of the program itself
// any goroutine want to be shutdown before main exit can be started from this executor
// GlobalUnboundedExecutor expects the main function to call stop
// it does not magically knows the main function exits
var GlobalUnboundedExecutor = NewUnboundedExecutor()
// NewUnboundedExecutor creates a new UnboundedExecutor,
// UnboundedExecutor can not be created by &UnboundedExecutor{}
// HandlePanic can be set with a callback to override global HandlePanic
func NewUnboundedExecutor() *UnboundedExecutor {
ctx, cancel := context.WithCancel(context.TODO())
return &UnboundedExecutor{
ctx: ctx,
cancel: cancel,
activeGoroutinesMutex: &sync.Mutex{},
activeGoroutines: map[string]int{},
}
}
// Go starts a new goroutine and tracks its lifecycle.
// Panic will be recovered and logged automatically, except for StopSignal
func (executor *UnboundedExecutor) Go(handler func(ctx context.Context)) {
pc := reflect.ValueOf(handler).Pointer()
f := runtime.FuncForPC(pc)
funcName := f.Name()
file, line := f.FileLine(pc)
executor.activeGoroutinesMutex.Lock()
defer executor.activeGoroutinesMutex.Unlock()
startFrom := fmt.Sprintf("%s:%d", file, line)
executor.activeGoroutines[startFrom] += 1
go func() {
defer func() {
recovered := recover()
// if you want to quit a goroutine without trigger HandlePanic
// use runtime.Goexit() to quit
if recovered != nil {
if executor.HandlePanic == nil {
HandlePanic(recovered, funcName)
} else {
executor.HandlePanic(recovered, funcName)
}
}
executor.activeGoroutinesMutex.Lock()
executor.activeGoroutines[startFrom] -= 1
executor.activeGoroutinesMutex.Unlock()
}()
handler(executor.ctx)
}()
}
// Stop cancel all goroutines started by this executor without wait
func (executor *UnboundedExecutor) Stop() {
executor.cancel()
}
// StopAndWaitForever cancel all goroutines started by this executor and
// wait until all goroutines exited
func (executor *UnboundedExecutor) StopAndWaitForever() {
executor.StopAndWait(context.Background())
}
// StopAndWait cancel all goroutines started by this executor and wait.
// Wait can be cancelled by the context passed in.
func (executor *UnboundedExecutor) StopAndWait(ctx context.Context) {
executor.cancel()
for {
oneHundredMilliseconds := time.NewTimer(time.Millisecond * 100)
select {
case <-oneHundredMilliseconds.C:
if executor.checkNoActiveGoroutines() {
return
}
case <-ctx.Done():
return
}
}
}
func (executor *UnboundedExecutor) checkNoActiveGoroutines() bool {
executor.activeGoroutinesMutex.Lock()
defer executor.activeGoroutinesMutex.Unlock()
for startFrom, count := range executor.activeGoroutines {
if count > 0 {
InfoLogger.Println("UnboundedExecutor is still waiting goroutines to quit",
"startFrom", startFrom,
"count", count)
return false
}
}
return true
}