Watching Kubernetes Resources in Go

February 27, 2023

Sometimes we might need to perform some actions based on the state change of Kubernetes resources. Let’s say we need to dynamically create Role and RoleBinding for newly created namespaces. One way to achieve this would be to have some component inside the cluster listening for namespace creation events and instructing Kubernetes API Server to create a Role and bind this role to predefined set of users which is stored in some ConfigMap inside the cluster.

The core of Kubernetes' control plane is the API server. The API server exposes an HTTP API that lets end users, different parts of your cluster, and external components communicate with one another.

As Kubernetes itself and many applications in its landscape are written in Go (https://go.dev/) let’s see what we can do using this programming language.

Go has a package with higher abstractions over Kubernetes Rest API – client-go (https://pkg.go.dev/k8s.io/client-go ).

To start we need to create a project and do some coding.

foo@bar:~$ go mod init client-go-demo
foo@bar:~$ vi main.go
package main

import (
    "context"
    "os"

    log "github.com/sirupsen/logrus"
    v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/clientcmd"
)

var (
    config, _    = clientcmd.BuildConfigFromFlags("", os.Getenv("KUBECONFIG"))
    clientset, _ = kubernetes.NewForConfig(config)
)

func main() {
    list, _ := clientset.CoreV1().Namespaces().List(context.Background(), v1.ListOptions{})

    for _, item := range list.Items {
        log.Info(item.Name)
    }
}
foo@bar:~$ go mod tidy  (To download required packages)
foo@bar:~$ go run main.go

As an output we will see something like

INFO[0001] cert-manager
INFO[0001] default
INFO[0001] dev-1
INFO[0001] dev-2
INFO[0001] dev-3

So what’s happening here. For each group of resources client-go provides separate client and all of them available through clientset. First we need to create it and we are using the same KUBECONFIG we would use to run kubectl.

config, _     = clientcmd.BuildConfigFromFlags("", os.Getenv("KUBECONFIG"))
clientset, _  = kubernetes.NewForConfig(config)

Now let’s extend example with a listener on namespace creation event.

package main

import (
    "context"
    "os"
    "sync"

    log "github.com/sirupsen/logrus"
    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/watch"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/clientcmd"
)

var (
    config, _    = clientcmd.BuildConfigFromFlags("", os.Getenv("KUBECONFIG"))
    clientset, _ = kubernetes.NewForConfig(config)
)

func watchNamespaces() {
    timeOut := int64(60)
    watcher, _ := clientset.CoreV1().Namespaces().Watch(context.Background(), metav1.ListOptions{TimeoutSeconds: &timeOut})

    for event := range watcher.ResultChan() {
        item := event.Object.(*corev1.Namespace)

        switch event.Type {
        case watch.Modified:
        case watch.Bookmark:
        case watch.Error:
        case watch.Deleted:
        case watch.Added:
            processNamespace(item.GetName())
        }
    }
}

func processNamespace(namespace string) {
    log.Info("Some processing for newly created namespace : ", namespace)
}

func main() {
    var wg sync.WaitGroup
    go watchNamespaces()
    wg.Add(1)
    wg.Wait()
}

Now if we create new namespace ‘test’ in the cluster we will see output like

INFO[0016] Some processing for newly created namespace : test

We are running listener in a separate goroutine just for the case when we would want to have more independent listeners and as main function is not waiting for goroutines to terminate we do sync with WaitGroup. As Watcher channel could be closed by different reasons, client-go provides a way to retry – RetryWatcher.

RetryWatcher will make sure that in case the underlying watcher is closed (e.g. due to API timeout or etcd timeout) it will get restarted from the last point without the consumer even knowing about it. RetryWatcher does that by inspecting events and keeping track of resourceVersion.

Let’s change the code accordingly

package main

import (
    "context"
    "os"
    "sync"

    log "github.com/sirupsen/logrus"
    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/watch"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/tools/clientcmd"
    toolsWatch "k8s.io/client-go/tools/watch"
)

var (
    config, _    = clientcmd.BuildConfigFromFlags("", os.Getenv("KUBECONFIG"))
    clientset, _ = kubernetes.NewForConfig(config)
)

func watchNamespaces() {

    watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
        timeOut := int64(60)
        return clientset.CoreV1().Namespaces().Watch(context.Background(), metav1.ListOptions{TimeoutSeconds: &timeOut})
    }

    watcher, _ := toolsWatch.NewRetryWatcher("1", &cache.ListWatch{WatchFunc: watchFunc})

    for event := range watcher.ResultChan() {
        item := event.Object.(*corev1.Namespace)

        switch event.Type {
        case watch.Modified:
        case watch.Bookmark:
        case watch.Error:
        case watch.Deleted:
        case watch.Added:
            processNamespace(item.GetName())
        }
    }
}

func processNamespace(namespace string) {
    log.Info("Some processing for newly created namespace : ", namespace)
}

func main() {
    var wg sync.WaitGroup
    go watchNamespaces()
    wg.Add(1)
    wg.Wait()
}

Now let’s add desired processing

package main

import (
    "context"
    "os"
    "sync"

    log "github.com/sirupsen/logrus"
    corev1 "k8s.io/api/core/v1"
    rbacv1 "k8s.io/api/rbac/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/watch"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/tools/clientcmd"
    toolsWatch "k8s.io/client-go/tools/watch"
)

var (
    config, _    = clientcmd.BuildConfigFromFlags("", os.Getenv("KUBECONFIG"))
    clientset, _ = kubernetes.NewForConfig(config)
)

func watchNamespaces() {

    watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
        timeOut := int64(60)
        return clientset.CoreV1().Namespaces().Watch(context.Background(), metav1.ListOptions{TimeoutSeconds: &timeOut})
    }

    watcher, _ := toolsWatch.NewRetryWatcher("1", &cache.ListWatch{WatchFunc: watchFunc})

    for event := range watcher.ResultChan() {
        item := event.Object.(*corev1.Namespace)

        switch event.Type {
        case watch.Modified:
        case watch.Bookmark:
        case watch.Error:
        case watch.Deleted:
        case watch.Added:
            processNamespace(item.GetName())
        }
    }
}

func processNamespace(namespace string) {
    log.Info("Some processing for newly created namespace : ", namespace)

    clientset.RbacV1().Roles(namespace).Update(context.Background(), getRole(namespace), metav1.UpdateOptions{})
    clientset.RbacV1().RoleBindings(namespace).Update(context.Background(), getRoleBinding(namespace), metav1.UpdateOptions{})
}

func getRole(namespace string) *rbacv1.Role {
    return &rbacv1.Role{
        ObjectMeta: metav1.ObjectMeta{
            Name: namespace + "-dev",
        },
        Rules: []rbacv1.PolicyRule{
            {
                APIGroups: []string{rbacv1.APIGroupAll},
                Resources: []string{rbacv1.ResourceAll},
                Verbs:     []string{rbacv1.VerbAll},
            },
        },
    }
}

func getRoleBinding(namespace string) *rbacv1.RoleBinding {
    return &rbacv1.RoleBinding{
        ObjectMeta: metav1.ObjectMeta{
            Name: namespace + "-dev",
        },
        RoleRef: rbacv1.RoleRef{
            APIGroup: "",
            Kind:     "Role",
            Name:     namespace + "-dev",
        },
// Those users should be fetched either from internal cm or some external storage
        Subjects: []rbacv1.Subject{
            {
                APIGroup: "rbac.authorization.k8s.io",
                Kind:     "User",
                Name:     "user1@tenant1.com",
            },
            {
                APIGroup: "rbac.authorization.k8s.io",
                Kind:     "User",
                Name:     "user2@tenant1.com",
            },
        },
    }
}

func main() {
    var wg sync.WaitGroup
    go watchNamespaces()
    wg.Add(1)
    wg.Wait()
}

Now if we create a namespace and run

foo@bar:~$ k get role test-dev -n test -oyaml
foo@bar:~$ k get rolebinding test-dev -n test -oyaml

we will see

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  creationTimestamp: "2023-02-25T22:56:37Z"
  name: test-dev
  namespace: test
  resourceVersion: "118728013"
  uid: acb3ec49-3372-412b-8057-3d8b3827a8f0
rules:
- apiGroups:
  - '*'
  resources:
  - '*'
  verbs:
  - '*'
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  creationTimestamp: "2023-02-25T22:56:37Z"
  name: test-dev
  namespace: test
  resourceVersion: "118728014"
  uid: 313190f3-6466-4ff9-92c2-914b1f1a58c5
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: Role
  name: test-dev
subjects:
- apiGroup: rbac.authorization.k8s.io
  kind: User
  name: user1@tenant1.com
- apiGroup: rbac.authorization.k8s.io
  kind: User
  name: user2@tenant1.com

Until now we were running the code from local PC. To have it as a pod running in the cluster we need to create, build and push image in some image registry.

FROM golang:1.20-alpine

WORKDIR /app
COPY go.mod ./
COPY go.sum ./

RUN go mod download
COPY *.go .

RUN go build -o /app
CMD [ "/app" ]

and create k8s deployment

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: k8s-events-demo
  namespace: dkh
spec:
  replicas: 1
  selector:
    matchLabels:
      app: k8s-events-demo
  template:
    metadata:
      labels:
        app: k8s-events-demo
    spec:
      serviceAccountName: k8s-events-demo
      imagePullSecrets:
        - name: …
      containers:
        - name: k8s-events-demo
          image: k8s-events-demo:0.1
          env:
          - name: …
            value: …
          resources:
            requests:
              cpu: "200m"
              memory: "256Mi"
            limits:
              memory: "256Mi"

As we don’t provide any KUBECONFIG as env variable a serviceAccount will be used to do calls to API Server.

Wrapping Up

This was a short introduction into usage of Kubernetes API by leveraging golang and client-go library.

About the author: Denis Khasbulatov
Comments
Join us