From ab7425d650b2da722e4e87b2679fa31f5f4d2689 Mon Sep 17 00:00:00 2001 From: Prasad Chandrasekaran Date: Mon, 14 Nov 2022 09:16:04 +0530 Subject: [PATCH] Migrate sample-controller to contextual logging Kubernetes-commit: e346475822604fc41bd38e24a331fc7a8314876a --- controller.go | 51 +++++++++++++++++++++++++------------------ controller_test.go | 41 +++++++++++++++++++--------------- main.go | 29 ++++++++++++++---------- pkg/signals/signal.go | 9 ++++---- 4 files changed, 76 insertions(+), 54 deletions(-) diff --git a/controller.go b/controller.go index 3d2dbaf1..13a5c33a 100644 --- a/controller.go +++ b/controller.go @@ -86,16 +86,19 @@ type Controller struct { // NewController returns a new sample controller func NewController( + ctx context.Context, kubeclientset kubernetes.Interface, sampleclientset clientset.Interface, deploymentInformer appsinformers.DeploymentInformer, fooInformer informers.FooInformer) *Controller { + logger := klog.FromContext(ctx) // Create event broadcaster // Add sample-controller types to the default Kubernetes Scheme so Events can be // logged for sample-controller types. utilruntime.Must(samplescheme.AddToScheme(scheme.Scheme)) - klog.V(4).Info("Creating event broadcaster") + logger.V(4).Info("Creating event broadcaster") + eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartStructuredLogging(0) eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")}) @@ -112,7 +115,7 @@ func NewController( recorder: recorder, } - klog.Info("Setting up event handlers") + logger.Info("Setting up event handlers") // Set up an event handler for when Foo resources change fooInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.enqueueFoo, @@ -148,28 +151,30 @@ func NewController( // as syncing informer caches and starting workers. It will block until stopCh // is closed, at which point it will shutdown the workqueue and wait for // workers to finish processing their current work items. -func (c *Controller) Run(workers int, stopCh <-chan struct{}) error { +func (c *Controller) Run(ctx context.Context, workers int) error { defer utilruntime.HandleCrash() defer c.workqueue.ShutDown() + logger := klog.FromContext(ctx) // Start the informer factories to begin populating the informer caches - klog.Info("Starting Foo controller") + logger.Info("Starting Foo controller") // Wait for the caches to be synced before starting workers - klog.Info("Waiting for informer caches to sync") - if ok := cache.WaitForCacheSync(stopCh, c.deploymentsSynced, c.foosSynced); !ok { + logger.Info("Waiting for informer caches to sync") + + if ok := cache.WaitForCacheSync(ctx.Done(), c.deploymentsSynced, c.foosSynced); !ok { return fmt.Errorf("failed to wait for caches to sync") } - klog.Info("Starting workers") + logger.Info("Starting workers", "count", workers) // Launch two workers to process Foo resources for i := 0; i < workers; i++ { - go wait.Until(c.runWorker, time.Second, stopCh) + go wait.UntilWithContext(ctx, c.runWorker, time.Second) } - klog.Info("Started workers") - <-stopCh - klog.Info("Shutting down workers") + logger.Info("Started workers") + <-ctx.Done() + logger.Info("Shutting down workers") return nil } @@ -177,15 +182,16 @@ func (c *Controller) Run(workers int, stopCh <-chan struct{}) error { // runWorker is a long-running function that will continually call the // processNextWorkItem function in order to read and process a message on the // workqueue. -func (c *Controller) runWorker() { - for c.processNextWorkItem() { +func (c *Controller) runWorker(ctx context.Context) { + for c.processNextWorkItem(ctx) { } } // processNextWorkItem will read a single work item off the workqueue and // attempt to process it, by calling the syncHandler. -func (c *Controller) processNextWorkItem() bool { +func (c *Controller) processNextWorkItem(ctx context.Context) bool { obj, shutdown := c.workqueue.Get() + logger := klog.FromContext(ctx) if shutdown { return false @@ -217,7 +223,7 @@ func (c *Controller) processNextWorkItem() bool { } // Run the syncHandler, passing it the namespace/name string of the // Foo resource to be synced. - if err := c.syncHandler(key); err != nil { + if err := c.syncHandler(ctx, key); err != nil { // Put the item back on the workqueue to handle any transient errors. c.workqueue.AddRateLimited(key) return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) @@ -225,7 +231,7 @@ func (c *Controller) processNextWorkItem() bool { // Finally, if no error occurs we Forget this item so it does not // get queued again until another change happens. c.workqueue.Forget(obj) - klog.Infof("Successfully synced '%s'", key) + logger.Info("Successfully synced", "resourceName", key) return nil }(obj) @@ -240,8 +246,10 @@ func (c *Controller) processNextWorkItem() bool { // syncHandler compares the actual state with the desired, and attempts to // converge the two. It then updates the Status block of the Foo resource // with the current status of the resource. -func (c *Controller) syncHandler(key string) error { +func (c *Controller) syncHandler(ctx context.Context, key string) error { // Convert the namespace/name string into a distinct namespace and name + logger := klog.LoggerWithValues(klog.FromContext(ctx), "resourceName", key) + namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key)) @@ -296,7 +304,7 @@ func (c *Controller) syncHandler(key string) error { // number does not equal the current desired replicas on the Deployment, we // should update the Deployment resource. if foo.Spec.Replicas != nil && *foo.Spec.Replicas != *deployment.Spec.Replicas { - klog.V(4).Infof("Foo %s replicas: %d, deployment replicas: %d", name, *foo.Spec.Replicas, *deployment.Spec.Replicas) + logger.V(4).Info("Update deployment resource", "currentReplicas", *foo.Spec.Replicas, "desiredReplicas", *deployment.Spec.Replicas) deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(context.TODO(), newDeployment(foo), metav1.UpdateOptions{}) } @@ -353,6 +361,7 @@ func (c *Controller) enqueueFoo(obj interface{}) { func (c *Controller) handleObject(obj interface{}) { var object metav1.Object var ok bool + logger := klog.FromContext(context.Background()) if object, ok = obj.(metav1.Object); !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { @@ -364,9 +373,9 @@ func (c *Controller) handleObject(obj interface{}) { utilruntime.HandleError(fmt.Errorf("error decoding object tombstone, invalid type")) return } - klog.V(4).Infof("Recovered deleted object '%s' from tombstone", object.GetName()) + logger.V(4).Info("Recovered deleted object", "resourceName", object.GetName()) } - klog.V(4).Infof("Processing object: %s", object.GetName()) + logger.V(4).Info("Processing object", "object", klog.KObj(object)) if ownerRef := metav1.GetControllerOf(object); ownerRef != nil { // If this object is not owned by a Foo, we should not do anything more // with it. @@ -376,7 +385,7 @@ func (c *Controller) handleObject(obj interface{}) { foo, err := c.foosLister.Foos(object.GetNamespace()).Get(ownerRef.Name) if err != nil { - klog.V(4).Infof("ignoring orphaned object '%s/%s' of foo '%s'", object.GetNamespace(), object.GetName(), ownerRef.Name) + logger.V(4).Info("Ignore orphaned object", "object", klog.KObj(object), "foo", ownerRef.Name) return } diff --git a/controller_test.go b/controller_test.go index 75741a72..102094dc 100644 --- a/controller_test.go +++ b/controller_test.go @@ -17,6 +17,7 @@ limitations under the License. package main import ( + "context" "fmt" "reflect" "testing" @@ -32,6 +33,7 @@ import ( core "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + "k8s.io/klog/v2/ktesting" samplecontroller "k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1" "k8s.io/sample-controller/pkg/generated/clientset/versioned/fake" @@ -81,14 +83,14 @@ func newFoo(name string, replicas *int32) *samplecontroller.Foo { } } -func (f *fixture) newController() (*Controller, informers.SharedInformerFactory, kubeinformers.SharedInformerFactory) { +func (f *fixture) newController(ctx context.Context) (*Controller, informers.SharedInformerFactory, kubeinformers.SharedInformerFactory) { f.client = fake.NewSimpleClientset(f.objects...) f.kubeclient = k8sfake.NewSimpleClientset(f.kubeobjects...) i := informers.NewSharedInformerFactory(f.client, noResyncPeriodFunc()) k8sI := kubeinformers.NewSharedInformerFactory(f.kubeclient, noResyncPeriodFunc()) - c := NewController(f.kubeclient, f.client, + c := NewController(ctx, f.kubeclient, f.client, k8sI.Apps().V1().Deployments(), i.Samplecontroller().V1alpha1().Foos()) c.foosSynced = alwaysReady @@ -106,24 +108,22 @@ func (f *fixture) newController() (*Controller, informers.SharedInformerFactory, return c, i, k8sI } -func (f *fixture) run(fooName string) { - f.runController(fooName, true, false) +func (f *fixture) run(ctx context.Context, fooName string) { + f.runController(ctx, fooName, true, false) } -func (f *fixture) runExpectError(fooName string) { - f.runController(fooName, true, true) +func (f *fixture) runExpectError(ctx context.Context, fooName string) { + f.runController(ctx, fooName, true, true) } -func (f *fixture) runController(fooName string, startInformers bool, expectError bool) { - c, i, k8sI := f.newController() +func (f *fixture) runController(ctx context.Context, fooName string, startInformers bool, expectError bool) { + c, i, k8sI := f.newController(ctx) if startInformers { - stopCh := make(chan struct{}) - defer close(stopCh) - i.Start(stopCh) - k8sI.Start(stopCh) + i.Start(ctx.Done()) + k8sI.Start(ctx.Done()) } - err := c.syncHandler(fooName) + err := c.syncHandler(ctx, fooName) if !expectError && err != nil { f.t.Errorf("error syncing foo: %v", err) } else if expectError && err == nil { @@ -252,6 +252,7 @@ func getKey(foo *samplecontroller.Foo, t *testing.T) string { func TestCreatesDeployment(t *testing.T) { f := newFixture(t) foo := newFoo("test", int32Ptr(1)) + _, ctx := ktesting.NewTestContext(t) f.fooLister = append(f.fooLister, foo) f.objects = append(f.objects, foo) @@ -260,12 +261,14 @@ func TestCreatesDeployment(t *testing.T) { f.expectCreateDeploymentAction(expDeployment) f.expectUpdateFooStatusAction(foo) - f.run(getKey(foo, t)) + f.run(ctx, getKey(foo, t)) } func TestDoNothing(t *testing.T) { f := newFixture(t) foo := newFoo("test", int32Ptr(1)) + _, ctx := ktesting.NewTestContext(t) + d := newDeployment(foo) f.fooLister = append(f.fooLister, foo) @@ -274,12 +277,14 @@ func TestDoNothing(t *testing.T) { f.kubeobjects = append(f.kubeobjects, d) f.expectUpdateFooStatusAction(foo) - f.run(getKey(foo, t)) + f.run(ctx, getKey(foo, t)) } func TestUpdateDeployment(t *testing.T) { f := newFixture(t) foo := newFoo("test", int32Ptr(1)) + _, ctx := ktesting.NewTestContext(t) + d := newDeployment(foo) // Update replicas @@ -293,12 +298,14 @@ func TestUpdateDeployment(t *testing.T) { f.expectUpdateFooStatusAction(foo) f.expectUpdateDeploymentAction(expDeployment) - f.run(getKey(foo, t)) + f.run(ctx, getKey(foo, t)) } func TestNotControlledByUs(t *testing.T) { f := newFixture(t) foo := newFoo("test", int32Ptr(1)) + _, ctx := ktesting.NewTestContext(t) + d := newDeployment(foo) d.ObjectMeta.OwnerReferences = []metav1.OwnerReference{} @@ -308,7 +315,7 @@ func TestNotControlledByUs(t *testing.T) { f.deploymentLister = append(f.deploymentLister, d) f.kubeobjects = append(f.kubeobjects, d) - f.runExpectError(getKey(foo, t)) + f.runExpectError(ctx, getKey(foo, t)) } func int32Ptr(i int32) *int32 { return &i } diff --git a/main.go b/main.go index 082eb385..cda6ca19 100644 --- a/main.go +++ b/main.go @@ -24,12 +24,12 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" + "k8s.io/sample-controller/pkg/signals" // Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters). // _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" clientset "k8s.io/sample-controller/pkg/generated/clientset/versioned" informers "k8s.io/sample-controller/pkg/generated/informers/externalversions" - "k8s.io/sample-controller/pkg/signals" ) var ( @@ -41,38 +41,43 @@ func main() { klog.InitFlags(nil) flag.Parse() - // set up signals so we handle the first shutdown signal gracefully - stopCh := signals.SetupSignalHandler() + // set up signals so we handle the shutdown signal gracefully + ctx := signals.SetupSignalHandler() + logger := klog.FromContext(ctx) cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig) if err != nil { - klog.Fatalf("Error building kubeconfig: %s", err.Error()) + logger.Error(err, "Error building kubeconfig") + klog.FlushAndExit(klog.ExitFlushTimeout, 1) } kubeClient, err := kubernetes.NewForConfig(cfg) if err != nil { - klog.Fatalf("Error building kubernetes clientset: %s", err.Error()) + logger.Error(err, "Error building kubernetes clientset") + klog.FlushAndExit(klog.ExitFlushTimeout, 1) } exampleClient, err := clientset.NewForConfig(cfg) if err != nil { - klog.Fatalf("Error building example clientset: %s", err.Error()) + logger.Error(err, "Error building kubernetes clientset") + klog.FlushAndExit(klog.ExitFlushTimeout, 1) } kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30) exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30) - controller := NewController(kubeClient, exampleClient, + controller := NewController(ctx, kubeClient, exampleClient, kubeInformerFactory.Apps().V1().Deployments(), exampleInformerFactory.Samplecontroller().V1alpha1().Foos()) - // notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh) + // notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(ctx.done()) // Start method is non-blocking and runs all registered informers in a dedicated goroutine. - kubeInformerFactory.Start(stopCh) - exampleInformerFactory.Start(stopCh) + kubeInformerFactory.Start(ctx.Done()) + exampleInformerFactory.Start(ctx.Done()) - if err = controller.Run(2, stopCh); err != nil { - klog.Fatalf("Error running controller: %s", err.Error()) + if err = controller.Run(ctx, 2); err != nil { + logger.Error(err, "Error running controller") + klog.FlushAndExit(klog.ExitFlushTimeout, 1) } } diff --git a/pkg/signals/signal.go b/pkg/signals/signal.go index 6bddfddb..e0d5c1b9 100644 --- a/pkg/signals/signal.go +++ b/pkg/signals/signal.go @@ -17,6 +17,7 @@ limitations under the License. package signals import ( + "context" "os" "os/signal" ) @@ -26,18 +27,18 @@ var onlyOneSignalHandler = make(chan struct{}) // SetupSignalHandler registered for SIGTERM and SIGINT. A stop channel is returned // which is closed on one of these signals. If a second signal is caught, the program // is terminated with exit code 1. -func SetupSignalHandler() (stopCh <-chan struct{}) { +func SetupSignalHandler() context.Context { close(onlyOneSignalHandler) // panics when called twice - stop := make(chan struct{}) c := make(chan os.Signal, 2) + ctx, cancel := context.WithCancel(context.Background()) signal.Notify(c, shutdownSignals...) go func() { <-c - close(stop) + cancel() <-c os.Exit(1) // second signal. Exit directly. }() - return stop + return ctx }