Watching Kubernetes Resources in Go
Sometimes we might need to perform some actions based on the state change of Kubernetes resources. Let’s say we need to dynamically create Role and RoleBinding for newly created namespaces. One way to achieve this would be to have some component inside the cluster listening for namespace creation events and instructing Kubernetes API Server to create a Role and bind this role to predefined set of users which is stored in some ConfigMap inside the cluster.
The core of Kubernetes' control plane is the API server. The API server exposes an HTTP API that lets end users, different parts of your cluster, and external components communicate with one another.
As Kubernetes itself and many applications in its landscape are written in Go (https://go.dev/) let’s see what we can do using this programming language.
Go has a package with higher abstractions over Kubernetes Rest API – client-go (https://pkg.go.dev/k8s.io/client-go ).
To start we need to create a project and do some coding.
foo@bar:~$ go mod init client-go-demo
foo@bar:~$ vi main.go
package main
import (
"context"
"os"
log "github.com/sirupsen/logrus"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
var (
config, _ = clientcmd.BuildConfigFromFlags("", os.Getenv("KUBECONFIG"))
clientset, _ = kubernetes.NewForConfig(config)
)
func main() {
list, _ := clientset.CoreV1().Namespaces().List(context.Background(), v1.ListOptions{})
for _, item := range list.Items {
log.Info(item.Name)
}
}
foo@bar:~$ go mod tidy (To download required packages)
foo@bar:~$ go run main.go
As an output we will see something like
INFO[0001] cert-manager
INFO[0001] default
INFO[0001] dev-1
INFO[0001] dev-2
INFO[0001] dev-3
So what’s happening here. For each group of resources client-go provides separate client and all of them available through clientset. First we need to create it and we are using the same KUBECONFIG we would use to run kubectl.
config, _ = clientcmd.BuildConfigFromFlags("", os.Getenv("KUBECONFIG"))
clientset, _ = kubernetes.NewForConfig(config)
Now let’s extend example with a listener on namespace creation event.
package main
import (
"context"
"os"
"sync"
log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
var (
config, _ = clientcmd.BuildConfigFromFlags("", os.Getenv("KUBECONFIG"))
clientset, _ = kubernetes.NewForConfig(config)
)
func watchNamespaces() {
timeOut := int64(60)
watcher, _ := clientset.CoreV1().Namespaces().Watch(context.Background(), metav1.ListOptions{TimeoutSeconds: &timeOut})
for event := range watcher.ResultChan() {
item := event.Object.(*corev1.Namespace)
switch event.Type {
case watch.Modified:
case watch.Bookmark:
case watch.Error:
case watch.Deleted:
case watch.Added:
processNamespace(item.GetName())
}
}
}
func processNamespace(namespace string) {
log.Info("Some processing for newly created namespace : ", namespace)
}
func main() {
var wg sync.WaitGroup
go watchNamespaces()
wg.Add(1)
wg.Wait()
}
Now if we create new namespace ‘test’ in the cluster we will see output like
INFO[0016] Some processing for newly created namespace : test
We are running listener in a separate goroutine just for the case when we would want to have more independent listeners and as main function is not waiting for goroutines to terminate we do sync with WaitGroup. As Watcher channel could be closed by different reasons, client-go provides a way to retry – RetryWatcher.
RetryWatcher will make sure that in case the underlying watcher is closed (e.g. due to API timeout or etcd timeout) it will get restarted from the last point without the consumer even knowing about it. RetryWatcher does that by inspecting events and keeping track of resourceVersion.
Let’s change the code accordingly
package main
import (
"context"
"os"
"sync"
log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
toolsWatch "k8s.io/client-go/tools/watch"
)
var (
config, _ = clientcmd.BuildConfigFromFlags("", os.Getenv("KUBECONFIG"))
clientset, _ = kubernetes.NewForConfig(config)
)
func watchNamespaces() {
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
timeOut := int64(60)
return clientset.CoreV1().Namespaces().Watch(context.Background(), metav1.ListOptions{TimeoutSeconds: &timeOut})
}
watcher, _ := toolsWatch.NewRetryWatcher("1", &cache.ListWatch{WatchFunc: watchFunc})
for event := range watcher.ResultChan() {
item := event.Object.(*corev1.Namespace)
switch event.Type {
case watch.Modified:
case watch.Bookmark:
case watch.Error:
case watch.Deleted:
case watch.Added:
processNamespace(item.GetName())
}
}
}
func processNamespace(namespace string) {
log.Info("Some processing for newly created namespace : ", namespace)
}
func main() {
var wg sync.WaitGroup
go watchNamespaces()
wg.Add(1)
wg.Wait()
}
Now let’s add desired processing
package main
import (
"context"
"os"
"sync"
log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
toolsWatch "k8s.io/client-go/tools/watch"
)
var (
config, _ = clientcmd.BuildConfigFromFlags("", os.Getenv("KUBECONFIG"))
clientset, _ = kubernetes.NewForConfig(config)
)
func watchNamespaces() {
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
timeOut := int64(60)
return clientset.CoreV1().Namespaces().Watch(context.Background(), metav1.ListOptions{TimeoutSeconds: &timeOut})
}
watcher, _ := toolsWatch.NewRetryWatcher("1", &cache.ListWatch{WatchFunc: watchFunc})
for event := range watcher.ResultChan() {
item := event.Object.(*corev1.Namespace)
switch event.Type {
case watch.Modified:
case watch.Bookmark:
case watch.Error:
case watch.Deleted:
case watch.Added:
processNamespace(item.GetName())
}
}
}
func processNamespace(namespace string) {
log.Info("Some processing for newly created namespace : ", namespace)
clientset.RbacV1().Roles(namespace).Update(context.Background(), getRole(namespace), metav1.UpdateOptions{})
clientset.RbacV1().RoleBindings(namespace).Update(context.Background(), getRoleBinding(namespace), metav1.UpdateOptions{})
}
func getRole(namespace string) *rbacv1.Role {
return &rbacv1.Role{
ObjectMeta: metav1.ObjectMeta{
Name: namespace + "-dev",
},
Rules: []rbacv1.PolicyRule{
{
APIGroups: []string{rbacv1.APIGroupAll},
Resources: []string{rbacv1.ResourceAll},
Verbs: []string{rbacv1.VerbAll},
},
},
}
}
func getRoleBinding(namespace string) *rbacv1.RoleBinding {
return &rbacv1.RoleBinding{
ObjectMeta: metav1.ObjectMeta{
Name: namespace + "-dev",
},
RoleRef: rbacv1.RoleRef{
APIGroup: "",
Kind: "Role",
Name: namespace + "-dev",
},
// Those users should be fetched either from internal cm or some external storage
Subjects: []rbacv1.Subject{
{
APIGroup: "rbac.authorization.k8s.io",
Kind: "User",
Name: "user1@tenant1.com",
},
{
APIGroup: "rbac.authorization.k8s.io",
Kind: "User",
Name: "user2@tenant1.com",
},
},
}
}
func main() {
var wg sync.WaitGroup
go watchNamespaces()
wg.Add(1)
wg.Wait()
}
Now if we create a namespace and run
foo@bar:~$ k get role test-dev -n test -oyaml
foo@bar:~$ k get rolebinding test-dev -n test -oyaml
we will see
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
creationTimestamp: "2023-02-25T22:56:37Z"
name: test-dev
namespace: test
resourceVersion: "118728013"
uid: acb3ec49-3372-412b-8057-3d8b3827a8f0
rules:
- apiGroups:
- '*'
resources:
- '*'
verbs:
- '*'
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
creationTimestamp: "2023-02-25T22:56:37Z"
name: test-dev
namespace: test
resourceVersion: "118728014"
uid: 313190f3-6466-4ff9-92c2-914b1f1a58c5
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: test-dev
subjects:
- apiGroup: rbac.authorization.k8s.io
kind: User
name: user1@tenant1.com
- apiGroup: rbac.authorization.k8s.io
kind: User
name: user2@tenant1.com
Until now we were running the code from local PC. To have it as a pod running in the cluster we need to create, build and push image in some image registry.
FROM golang:1.20-alpine
WORKDIR /app
COPY go.mod ./
COPY go.sum ./
RUN go mod download
COPY *.go .
RUN go build -o /app
CMD [ "/app" ]
and create k8s deployment
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: k8s-events-demo
namespace: dkh
spec:
replicas: 1
selector:
matchLabels:
app: k8s-events-demo
template:
metadata:
labels:
app: k8s-events-demo
spec:
serviceAccountName: k8s-events-demo
imagePullSecrets:
- name: …
containers:
- name: k8s-events-demo
image: k8s-events-demo:0.1
env:
- name: …
value: …
resources:
requests:
cpu: "200m"
memory: "256Mi"
limits:
memory: "256Mi"
As we don’t provide any KUBECONFIG as env variable a serviceAccount will be used to do calls to API Server.
Wrapping Up
This was a short introduction into usage of Kubernetes API by leveraging golang and client-go library.