Migrate sample-controller to contextual logging

Kubernetes-commit: e346475822604fc41bd38e24a331fc7a8314876a
This commit is contained in:
Prasad Chandrasekaran 2022-11-14 09:16:04 +05:30 committed by Kubernetes Publisher
parent 32a619188a
commit ab7425d650
4 changed files with 76 additions and 54 deletions

View File

@ -86,16 +86,19 @@ type Controller struct {
// NewController returns a new sample controller // NewController returns a new sample controller
func NewController( func NewController(
ctx context.Context,
kubeclientset kubernetes.Interface, kubeclientset kubernetes.Interface,
sampleclientset clientset.Interface, sampleclientset clientset.Interface,
deploymentInformer appsinformers.DeploymentInformer, deploymentInformer appsinformers.DeploymentInformer,
fooInformer informers.FooInformer) *Controller { fooInformer informers.FooInformer) *Controller {
logger := klog.FromContext(ctx)
// Create event broadcaster // Create event broadcaster
// Add sample-controller types to the default Kubernetes Scheme so Events can be // Add sample-controller types to the default Kubernetes Scheme so Events can be
// logged for sample-controller types. // logged for sample-controller types.
utilruntime.Must(samplescheme.AddToScheme(scheme.Scheme)) utilruntime.Must(samplescheme.AddToScheme(scheme.Scheme))
klog.V(4).Info("Creating event broadcaster") logger.V(4).Info("Creating event broadcaster")
eventBroadcaster := record.NewBroadcaster() eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0) eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")}) eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
@ -112,7 +115,7 @@ func NewController(
recorder: recorder, recorder: recorder,
} }
klog.Info("Setting up event handlers") logger.Info("Setting up event handlers")
// Set up an event handler for when Foo resources change // Set up an event handler for when Foo resources change
fooInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ fooInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueFoo, AddFunc: controller.enqueueFoo,
@ -148,28 +151,30 @@ func NewController(
// as syncing informer caches and starting workers. It will block until stopCh // as syncing informer caches and starting workers. It will block until stopCh
// is closed, at which point it will shutdown the workqueue and wait for // is closed, at which point it will shutdown the workqueue and wait for
// workers to finish processing their current work items. // workers to finish processing their current work items.
func (c *Controller) Run(workers int, stopCh <-chan struct{}) error { func (c *Controller) Run(ctx context.Context, workers int) error {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
defer c.workqueue.ShutDown() defer c.workqueue.ShutDown()
logger := klog.FromContext(ctx)
// Start the informer factories to begin populating the informer caches // Start the informer factories to begin populating the informer caches
klog.Info("Starting Foo controller") logger.Info("Starting Foo controller")
// Wait for the caches to be synced before starting workers // Wait for the caches to be synced before starting workers
klog.Info("Waiting for informer caches to sync") logger.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, c.deploymentsSynced, c.foosSynced); !ok {
if ok := cache.WaitForCacheSync(ctx.Done(), c.deploymentsSynced, c.foosSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync") return fmt.Errorf("failed to wait for caches to sync")
} }
klog.Info("Starting workers") logger.Info("Starting workers", "count", workers)
// Launch two workers to process Foo resources // Launch two workers to process Foo resources
for i := 0; i < workers; i++ { for i := 0; i < workers; i++ {
go wait.Until(c.runWorker, time.Second, stopCh) go wait.UntilWithContext(ctx, c.runWorker, time.Second)
} }
klog.Info("Started workers") logger.Info("Started workers")
<-stopCh <-ctx.Done()
klog.Info("Shutting down workers") logger.Info("Shutting down workers")
return nil return nil
} }
@ -177,15 +182,16 @@ func (c *Controller) Run(workers int, stopCh <-chan struct{}) error {
// runWorker is a long-running function that will continually call the // runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the // processNextWorkItem function in order to read and process a message on the
// workqueue. // workqueue.
func (c *Controller) runWorker() { func (c *Controller) runWorker(ctx context.Context) {
for c.processNextWorkItem() { for c.processNextWorkItem(ctx) {
} }
} }
// processNextWorkItem will read a single work item off the workqueue and // processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the syncHandler. // attempt to process it, by calling the syncHandler.
func (c *Controller) processNextWorkItem() bool { func (c *Controller) processNextWorkItem(ctx context.Context) bool {
obj, shutdown := c.workqueue.Get() obj, shutdown := c.workqueue.Get()
logger := klog.FromContext(ctx)
if shutdown { if shutdown {
return false return false
@ -217,7 +223,7 @@ func (c *Controller) processNextWorkItem() bool {
} }
// Run the syncHandler, passing it the namespace/name string of the // Run the syncHandler, passing it the namespace/name string of the
// Foo resource to be synced. // Foo resource to be synced.
if err := c.syncHandler(key); err != nil { if err := c.syncHandler(ctx, key); err != nil {
// Put the item back on the workqueue to handle any transient errors. // Put the item back on the workqueue to handle any transient errors.
c.workqueue.AddRateLimited(key) c.workqueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
@ -225,7 +231,7 @@ func (c *Controller) processNextWorkItem() bool {
// Finally, if no error occurs we Forget this item so it does not // Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens. // get queued again until another change happens.
c.workqueue.Forget(obj) c.workqueue.Forget(obj)
klog.Infof("Successfully synced '%s'", key) logger.Info("Successfully synced", "resourceName", key)
return nil return nil
}(obj) }(obj)
@ -240,8 +246,10 @@ func (c *Controller) processNextWorkItem() bool {
// syncHandler compares the actual state with the desired, and attempts to // syncHandler compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the Foo resource // converge the two. It then updates the Status block of the Foo resource
// with the current status of the resource. // with the current status of the resource.
func (c *Controller) syncHandler(key string) error { func (c *Controller) syncHandler(ctx context.Context, key string) error {
// Convert the namespace/name string into a distinct namespace and name // Convert the namespace/name string into a distinct namespace and name
logger := klog.LoggerWithValues(klog.FromContext(ctx), "resourceName", key)
namespace, name, err := cache.SplitMetaNamespaceKey(key) namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil { if err != nil {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key)) utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
@ -296,7 +304,7 @@ func (c *Controller) syncHandler(key string) error {
// number does not equal the current desired replicas on the Deployment, we // number does not equal the current desired replicas on the Deployment, we
// should update the Deployment resource. // should update the Deployment resource.
if foo.Spec.Replicas != nil && *foo.Spec.Replicas != *deployment.Spec.Replicas { if foo.Spec.Replicas != nil && *foo.Spec.Replicas != *deployment.Spec.Replicas {
klog.V(4).Infof("Foo %s replicas: %d, deployment replicas: %d", name, *foo.Spec.Replicas, *deployment.Spec.Replicas) logger.V(4).Info("Update deployment resource", "currentReplicas", *foo.Spec.Replicas, "desiredReplicas", *deployment.Spec.Replicas)
deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(context.TODO(), newDeployment(foo), metav1.UpdateOptions{}) deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(context.TODO(), newDeployment(foo), metav1.UpdateOptions{})
} }
@ -353,6 +361,7 @@ func (c *Controller) enqueueFoo(obj interface{}) {
func (c *Controller) handleObject(obj interface{}) { func (c *Controller) handleObject(obj interface{}) {
var object metav1.Object var object metav1.Object
var ok bool var ok bool
logger := klog.FromContext(context.Background())
if object, ok = obj.(metav1.Object); !ok { if object, ok = obj.(metav1.Object); !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown) tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok { if !ok {
@ -364,9 +373,9 @@ func (c *Controller) handleObject(obj interface{}) {
utilruntime.HandleError(fmt.Errorf("error decoding object tombstone, invalid type")) utilruntime.HandleError(fmt.Errorf("error decoding object tombstone, invalid type"))
return return
} }
klog.V(4).Infof("Recovered deleted object '%s' from tombstone", object.GetName()) logger.V(4).Info("Recovered deleted object", "resourceName", object.GetName())
} }
klog.V(4).Infof("Processing object: %s", object.GetName()) logger.V(4).Info("Processing object", "object", klog.KObj(object))
if ownerRef := metav1.GetControllerOf(object); ownerRef != nil { if ownerRef := metav1.GetControllerOf(object); ownerRef != nil {
// If this object is not owned by a Foo, we should not do anything more // If this object is not owned by a Foo, we should not do anything more
// with it. // with it.
@ -376,7 +385,7 @@ func (c *Controller) handleObject(obj interface{}) {
foo, err := c.foosLister.Foos(object.GetNamespace()).Get(ownerRef.Name) foo, err := c.foosLister.Foos(object.GetNamespace()).Get(ownerRef.Name)
if err != nil { if err != nil {
klog.V(4).Infof("ignoring orphaned object '%s/%s' of foo '%s'", object.GetNamespace(), object.GetName(), ownerRef.Name) logger.V(4).Info("Ignore orphaned object", "object", klog.KObj(object), "foo", ownerRef.Name)
return return
} }

View File

@ -17,6 +17,7 @@ limitations under the License.
package main package main
import ( import (
"context"
"fmt" "fmt"
"reflect" "reflect"
"testing" "testing"
@ -32,6 +33,7 @@ import (
core "k8s.io/client-go/testing" core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/klog/v2/ktesting"
samplecontroller "k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1" samplecontroller "k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1"
"k8s.io/sample-controller/pkg/generated/clientset/versioned/fake" "k8s.io/sample-controller/pkg/generated/clientset/versioned/fake"
@ -81,14 +83,14 @@ func newFoo(name string, replicas *int32) *samplecontroller.Foo {
} }
} }
func (f *fixture) newController() (*Controller, informers.SharedInformerFactory, kubeinformers.SharedInformerFactory) { func (f *fixture) newController(ctx context.Context) (*Controller, informers.SharedInformerFactory, kubeinformers.SharedInformerFactory) {
f.client = fake.NewSimpleClientset(f.objects...) f.client = fake.NewSimpleClientset(f.objects...)
f.kubeclient = k8sfake.NewSimpleClientset(f.kubeobjects...) f.kubeclient = k8sfake.NewSimpleClientset(f.kubeobjects...)
i := informers.NewSharedInformerFactory(f.client, noResyncPeriodFunc()) i := informers.NewSharedInformerFactory(f.client, noResyncPeriodFunc())
k8sI := kubeinformers.NewSharedInformerFactory(f.kubeclient, noResyncPeriodFunc()) k8sI := kubeinformers.NewSharedInformerFactory(f.kubeclient, noResyncPeriodFunc())
c := NewController(f.kubeclient, f.client, c := NewController(ctx, f.kubeclient, f.client,
k8sI.Apps().V1().Deployments(), i.Samplecontroller().V1alpha1().Foos()) k8sI.Apps().V1().Deployments(), i.Samplecontroller().V1alpha1().Foos())
c.foosSynced = alwaysReady c.foosSynced = alwaysReady
@ -106,24 +108,22 @@ func (f *fixture) newController() (*Controller, informers.SharedInformerFactory,
return c, i, k8sI return c, i, k8sI
} }
func (f *fixture) run(fooName string) { func (f *fixture) run(ctx context.Context, fooName string) {
f.runController(fooName, true, false) f.runController(ctx, fooName, true, false)
} }
func (f *fixture) runExpectError(fooName string) { func (f *fixture) runExpectError(ctx context.Context, fooName string) {
f.runController(fooName, true, true) f.runController(ctx, fooName, true, true)
} }
func (f *fixture) runController(fooName string, startInformers bool, expectError bool) { func (f *fixture) runController(ctx context.Context, fooName string, startInformers bool, expectError bool) {
c, i, k8sI := f.newController() c, i, k8sI := f.newController(ctx)
if startInformers { if startInformers {
stopCh := make(chan struct{}) i.Start(ctx.Done())
defer close(stopCh) k8sI.Start(ctx.Done())
i.Start(stopCh)
k8sI.Start(stopCh)
} }
err := c.syncHandler(fooName) err := c.syncHandler(ctx, fooName)
if !expectError && err != nil { if !expectError && err != nil {
f.t.Errorf("error syncing foo: %v", err) f.t.Errorf("error syncing foo: %v", err)
} else if expectError && err == nil { } else if expectError && err == nil {
@ -252,6 +252,7 @@ func getKey(foo *samplecontroller.Foo, t *testing.T) string {
func TestCreatesDeployment(t *testing.T) { func TestCreatesDeployment(t *testing.T) {
f := newFixture(t) f := newFixture(t)
foo := newFoo("test", int32Ptr(1)) foo := newFoo("test", int32Ptr(1))
_, ctx := ktesting.NewTestContext(t)
f.fooLister = append(f.fooLister, foo) f.fooLister = append(f.fooLister, foo)
f.objects = append(f.objects, foo) f.objects = append(f.objects, foo)
@ -260,12 +261,14 @@ func TestCreatesDeployment(t *testing.T) {
f.expectCreateDeploymentAction(expDeployment) f.expectCreateDeploymentAction(expDeployment)
f.expectUpdateFooStatusAction(foo) f.expectUpdateFooStatusAction(foo)
f.run(getKey(foo, t)) f.run(ctx, getKey(foo, t))
} }
func TestDoNothing(t *testing.T) { func TestDoNothing(t *testing.T) {
f := newFixture(t) f := newFixture(t)
foo := newFoo("test", int32Ptr(1)) foo := newFoo("test", int32Ptr(1))
_, ctx := ktesting.NewTestContext(t)
d := newDeployment(foo) d := newDeployment(foo)
f.fooLister = append(f.fooLister, foo) f.fooLister = append(f.fooLister, foo)
@ -274,12 +277,14 @@ func TestDoNothing(t *testing.T) {
f.kubeobjects = append(f.kubeobjects, d) f.kubeobjects = append(f.kubeobjects, d)
f.expectUpdateFooStatusAction(foo) f.expectUpdateFooStatusAction(foo)
f.run(getKey(foo, t)) f.run(ctx, getKey(foo, t))
} }
func TestUpdateDeployment(t *testing.T) { func TestUpdateDeployment(t *testing.T) {
f := newFixture(t) f := newFixture(t)
foo := newFoo("test", int32Ptr(1)) foo := newFoo("test", int32Ptr(1))
_, ctx := ktesting.NewTestContext(t)
d := newDeployment(foo) d := newDeployment(foo)
// Update replicas // Update replicas
@ -293,12 +298,14 @@ func TestUpdateDeployment(t *testing.T) {
f.expectUpdateFooStatusAction(foo) f.expectUpdateFooStatusAction(foo)
f.expectUpdateDeploymentAction(expDeployment) f.expectUpdateDeploymentAction(expDeployment)
f.run(getKey(foo, t)) f.run(ctx, getKey(foo, t))
} }
func TestNotControlledByUs(t *testing.T) { func TestNotControlledByUs(t *testing.T) {
f := newFixture(t) f := newFixture(t)
foo := newFoo("test", int32Ptr(1)) foo := newFoo("test", int32Ptr(1))
_, ctx := ktesting.NewTestContext(t)
d := newDeployment(foo) d := newDeployment(foo)
d.ObjectMeta.OwnerReferences = []metav1.OwnerReference{} d.ObjectMeta.OwnerReferences = []metav1.OwnerReference{}
@ -308,7 +315,7 @@ func TestNotControlledByUs(t *testing.T) {
f.deploymentLister = append(f.deploymentLister, d) f.deploymentLister = append(f.deploymentLister, d)
f.kubeobjects = append(f.kubeobjects, d) f.kubeobjects = append(f.kubeobjects, d)
f.runExpectError(getKey(foo, t)) f.runExpectError(ctx, getKey(foo, t))
} }
func int32Ptr(i int32) *int32 { return &i } func int32Ptr(i int32) *int32 { return &i }

29
main.go
View File

@ -24,12 +24,12 @@ import (
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/sample-controller/pkg/signals"
// Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters). // Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters).
// _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" // _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
clientset "k8s.io/sample-controller/pkg/generated/clientset/versioned" clientset "k8s.io/sample-controller/pkg/generated/clientset/versioned"
informers "k8s.io/sample-controller/pkg/generated/informers/externalversions" informers "k8s.io/sample-controller/pkg/generated/informers/externalversions"
"k8s.io/sample-controller/pkg/signals"
) )
var ( var (
@ -41,38 +41,43 @@ func main() {
klog.InitFlags(nil) klog.InitFlags(nil)
flag.Parse() flag.Parse()
// set up signals so we handle the first shutdown signal gracefully // set up signals so we handle the shutdown signal gracefully
stopCh := signals.SetupSignalHandler() ctx := signals.SetupSignalHandler()
logger := klog.FromContext(ctx)
cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig) cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
if err != nil { if err != nil {
klog.Fatalf("Error building kubeconfig: %s", err.Error()) logger.Error(err, "Error building kubeconfig")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
} }
kubeClient, err := kubernetes.NewForConfig(cfg) kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil { if err != nil {
klog.Fatalf("Error building kubernetes clientset: %s", err.Error()) logger.Error(err, "Error building kubernetes clientset")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
} }
exampleClient, err := clientset.NewForConfig(cfg) exampleClient, err := clientset.NewForConfig(cfg)
if err != nil { if err != nil {
klog.Fatalf("Error building example clientset: %s", err.Error()) logger.Error(err, "Error building kubernetes clientset")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
} }
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30) kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30) exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)
controller := NewController(kubeClient, exampleClient, controller := NewController(ctx, kubeClient, exampleClient,
kubeInformerFactory.Apps().V1().Deployments(), kubeInformerFactory.Apps().V1().Deployments(),
exampleInformerFactory.Samplecontroller().V1alpha1().Foos()) exampleInformerFactory.Samplecontroller().V1alpha1().Foos())
// notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh) // notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(ctx.done())
// Start method is non-blocking and runs all registered informers in a dedicated goroutine. // Start method is non-blocking and runs all registered informers in a dedicated goroutine.
kubeInformerFactory.Start(stopCh) kubeInformerFactory.Start(ctx.Done())
exampleInformerFactory.Start(stopCh) exampleInformerFactory.Start(ctx.Done())
if err = controller.Run(2, stopCh); err != nil { if err = controller.Run(ctx, 2); err != nil {
klog.Fatalf("Error running controller: %s", err.Error()) logger.Error(err, "Error running controller")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
} }
} }

View File

@ -17,6 +17,7 @@ limitations under the License.
package signals package signals
import ( import (
"context"
"os" "os"
"os/signal" "os/signal"
) )
@ -26,18 +27,18 @@ var onlyOneSignalHandler = make(chan struct{})
// SetupSignalHandler registered for SIGTERM and SIGINT. A stop channel is returned // SetupSignalHandler registered for SIGTERM and SIGINT. A stop channel is returned
// which is closed on one of these signals. If a second signal is caught, the program // which is closed on one of these signals. If a second signal is caught, the program
// is terminated with exit code 1. // is terminated with exit code 1.
func SetupSignalHandler() (stopCh <-chan struct{}) { func SetupSignalHandler() context.Context {
close(onlyOneSignalHandler) // panics when called twice close(onlyOneSignalHandler) // panics when called twice
stop := make(chan struct{})
c := make(chan os.Signal, 2) c := make(chan os.Signal, 2)
ctx, cancel := context.WithCancel(context.Background())
signal.Notify(c, shutdownSignals...) signal.Notify(c, shutdownSignals...)
go func() { go func() {
<-c <-c
close(stop) cancel()
<-c <-c
os.Exit(1) // second signal. Exit directly. os.Exit(1) // second signal. Exit directly.
}() }()
return stop return ctx
} }