External Access of Apache Kafka on OpenShift
Containers are a solution to the problem of how to get the software to run when moved from one computing environment to another. Especially with Docker, as container solution, started a new DevOps trend. We, as software engineers, leverage this technology to ship our software to any environment.
With the container technology, Kubernetes (K8S) became the de-facto industry standard for container orchestration. K8S is an open-source system for automating deployment, scaling, and management of containerized applications. RedHat reshaped OpenShift with an enterprise K8S, the OpenShift Container Platform. OpenShift is a container platform that runs your software on top of K8S and other additional concepts.
A lot of our software architectures rely on Apache Kafka. Apache Kafka is a distributed messaging system. How do we install Kafka on OpenShift? If it is on OpenShift, how do we access the data? The answer to these questions is Strimzi.
Strimzi is an open-source project that provides container images and operators for running Apache Kafka on Kubernetes and Red Hat OpenShift.
Motivation
Who is that article for?
This article will explain how Strimzi makes Kafka accessible for clients running outside of Kubernetes. We will deploy a Kafka cluster with the Strimzi Operator on minishift (OKD/OpenShift). This is for all (DevOps) engineers who need a concise example or wrap-up how Strimzi works.
What basic knowledge do I need to understand this article?
It is not required to have all necessary knowledge, but it helps tremendously if you are knowledgeable with the concepts of
- Apache Kafka
- Kubernetes in short K8S
- Containers (Docker, Podman ) and their orchestration
What you will learn about, in this article:
- Kubernetes
- Operator Concept
- The need for Stateful Sets
- Strimzi - the Kafka Operator for K8S
- Installing Kafka on minishift
- How Kafka works on K8S
- Connect with an external client to Kafka on OpenShift
Credits
This article is base upon the work of Accessing Apache Kafka in Strimzi: Part 1 and Accessing Apache Kafka in Strimzi: Part 3.
Differentiation
Apache Kafka is Open Source, the Confluent Platform is a professional community and enterprise offering for Apache Kafka. Strimzi brings Apache Kafka on Kubernetes and OpenShift. So we won't deal with the Confluent Platform in this article. mimacom is also Confluent Partner and can provide assistance in this matter.
Benefits of Kafka on Kubernetes
Apache Kafka is an essential data infrastructure backbone. Kafka, as a distributed system, promises high availability and decoupling of system dependencies. Producers (data supplier) and Consumers can process the data in their own speed. Kafka, therefore, is installed and deployed on several machines.
Kubernetes is a system for deploying applications, that can save money because it takes less IT personnel to manage. It also makes your apps a lot more portable, so you can move them more easily between different clouds and internal environments. The key is automation, the price to pay is the complexity and further education of your DevOps Engineers.
Having Kafka on Kubernetes, allows Kubernetes to deploy Kafka on available resources with the guarantee that the Kafka cluster never face disruption or downtime.
Do I need Kafka on K8S? It depends on your scenario and environment. IMHO if you have no experience with Apache Kafka at all, learn the product and basics first, before operating it on K8S.
The Environment
To understand our example, we provide the environment details for this article.
- We use minishift
- We deploy the Strimzi Operator on minishift
We use a Linux system for demonstration.
Minishift
Minishift is a tool that helps you run OpenShift locally by running a single-node OpenShift cluster inside a VM. For further information about installation, refer to the product page. It is a robust solution to test OpenShift concepts locally before going to the production cluster.
We start minishift with this command on a Linux terminal:
minishift start
The output should be like this:
With a start, we need to add the oc client to the PATH.
Simply run this command.
eval $(minishift oc-env)
Query the OpenShift/OKD version to test if the oc client is working.
oc version
oc v3.11.0+0cbc58b
kubernetes v1.11.0+d4cacc0
features: Basic-Auth GSSAPI Kerberos SPNEGO
Server https://192.168.42.24:8443
kubernetes v1.11.0+d4cacc0
We have now our infrastructure running. OpenShift is an enterprise Kubernetes offering by RedHat.
The Community Distribution of Kubernetes that powers Red Hat OpenShift is OKD
.
Installing the Strimzi Operator
This section covers how we install the Strimzi Operator on minishift.
Login as admin into minishift.
oc login -u system:admin
Logged into "https://192.168.42.24:8443" as "system:admin" using existing credentials.
You have access to the following projects and can switch between them with 'oc project <projectname>':
* default
kube-dns
kube-proxy
kube-public
kube-system
openshift
openshift-apiserver
openshift-controller-manager
openshift-core-operators
openshift-infra
openshift-node
openshift-service-cert-signer
openshift-web-console
Using project "default".
Create a new project with the name streaming
.
oc new-project streaming
Now using project "streaming" on server "https://192.168.42.24:8443".
You can add applications to this project with the 'new-app' command. For example, try:
oc new-app centos/ruby-25-centos7~https://github.com/sclorg/ruby-ex.git
to build a new example application in Ruby.
Install the Strimzi Operator (namespace option is not needed but for completeness) we declare it.
oc apply -f 'https://strimzi.io/install/latest?namespace=streaming' -n streaming
The minishift output:
customresourcedefinition.apiextensions.k8s.io/kafkas.kafka.strimzi.io created
rolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator-entity-operator-delegation created
clusterrolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator created
rolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator-topic-operator-delegation created
customresourcedefinition.apiextensions.k8s.io/kafkausers.kafka.strimzi.io created
customresourcedefinition.apiextensions.k8s.io/kafkarebalances.kafka.strimzi.io created
customresourcedefinition.apiextensions.k8s.io/kafkamirrormaker2s.kafka.strimzi.io created
clusterrole.rbac.authorization.k8s.io/strimzi-entity-operator created
clusterrole.rbac.authorization.k8s.io/strimzi-cluster-operator-global created
clusterrolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator-kafka-broker-delegation created
rolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator created
clusterrole.rbac.authorization.k8s.io/strimzi-cluster-operator-namespaced created
clusterrole.rbac.authorization.k8s.io/strimzi-topic-operator created
serviceaccount/strimzi-cluster-operator created
clusterrole.rbac.authorization.k8s.io/strimzi-kafka-broker created
customresourcedefinition.apiextensions.k8s.io/kafkatopics.kafka.strimzi.io created
customresourcedefinition.apiextensions.k8s.io/kafkabridges.kafka.strimzi.io created
deployment.apps/strimzi-cluster-operator created
customresourcedefinition.apiextensions.k8s.io/kafkaconnectors.kafka.strimzi.io created
customresourcedefinition.apiextensions.k8s.io/kafkaconnects2is.kafka.strimzi.io created
customresourcedefinition.apiextensions.k8s.io/kafkaconnects.kafka.strimzi.io created
customresourcedefinition.apiextensions.k8s.io/kafkamirrormakers.kafka.strimzi.io created
If you can not access the strimzi page, add a proxy or check out the GitHub Repo.
The Operator Concept
Operators are software extensions to Kubernetes that make use of custom resources to manage applications and their components. As you see in the above installation output, Strimzi creates custom resources.
The Operator pattern aims to capture the key aim of a human operator who is managing a service or set of services. Human operators who look after specific applications and services have deep knowledge of how the system ought to behave, how to deploy it, and how to react if there are problems.
An Operator is essentially a custom controller. The Strimzi Operator has in-depth knowledge about Apache Kafka clusters.
An Operator watches for these custom resource types and is notified about their presence or modification. With the
custom resource kafkas.kafka.strimzi.io
you describe the Kafka cluster and the Strimzi operator
deploys the Kafka cluster for you in an automated way.
The Kafka Resource
When the operator receives this notification, it will start running a loop to ensure that all the required objects for the application service are actually available and configured in the object's specification by the user.
We take from Strimzi the example for a Kafka cluster specification and create a custom one for our testing scenario.
curl -s https://strimzi.io/examples/latest/kafka/kafka-persistent-single.yaml \
| sed 's/name: .*/name: logs/' \
| sed 's/size: .*/size: 100Mi/' \
| cat
Above command takes the example file and replace the cluster name to logs
and we only requests 100 MegaByte instead
of GigaByte for our testing scenario. You see on your terminal following output:
apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
name: logs
spec:
kafka:
version: 2.5.0
replicas: 1
listeners:
plain: {}
tls: {}
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
log.message.format.version: "2.5"
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 100Mi
deleteClaim: false
zookeeper:
replicas: 1
storage:
type: persistent-claim
size: 100Mi
deleteClaim: false
entityOperator:
topicOperator: {}
userOperator: {}
We have a sufficient description of our Kafka cluster. It is just semantics.
Now that we have the description of our desired cluster, we deploy it in the created namespace streaming
in minishift.
curl -s https://strimzi.io/examples/latest/kafka/kafka-persistent-single.yaml \
| sed 's/name: .*/name: logs/' \
| sed 's/size: .*/size: 100Mi/' \
| oc -n streaming apply -f -
minishift will yield:
kafka.kafka.strimzi.io/logs created
Strimzi Operator in Action
The Strimzi operator creates all necessary objects to fulfil the description. The operator itself is in a pod. Showing the logs of the operator, you know what Strimzi does for you.
oc logs -f strimzi-cluster-operator-bf978cf5c-rc5l2
+ shift
+ export MALLOC_ARENA_MAX=2
+ MALLOC_ARENA_MAX=2
+ JAVA_OPTS=' -Dvertx.cacheDirBase=/tmp -Djava.security.egd=file:/dev/./urandom'
++ get_gc_opts
++ '[' '' == true ']'
++ echo ''
+ JAVA_OPTS=' -Dvertx.cacheDirBase=/tmp -Djava.security.egd=file:/dev/./urandom '
+ exec /usr/bin/tini -w -e 143 -- java -Dvertx.cacheDirBase=/tmp -Djava.security.egd=file:/dev/./urandom -classpath lib/io.strimzi.cluster-operator-0.18.0.jar:lib/io.prometheus.simpleclient_common-0.7.0.jar:lib/io.strimzi.kafka-oauth-client-0.5.0.jar:lib/io.netty.netty-handler-4.1.45.Final.jar:lib/io.netty.netty-codec-http-4.1.45.Final.jar:lib/org.quartz-scheduler.quartz-2.2.1.jar:lib/org.bouncycastle.bcprov-jdk15on-1.60.jar:lib/com.squareup.okio.okio-1.15.0.jar:lib/org.keycloak.keycloak-core-10.0.0.jar:lib/io.netty.netty-buffer-4.1.45.Final.jar:lib/org.yaml.snakeyaml-1.24.jar:lib/io.fabric8.openshift-client-4.6.4.jar:lib/io.netty.netty-common-4.1.45.Final.jar:lib/org.apache.logging.log4j.log4j-api-2.13.0.jar:lib/org.xerial.snappy.snappy-java-1.1.7.3.jar:lib/org.hdrhistogram.HdrHistogram-2.1.11.jar:lib/io.prometheus.simpleclient-0.7.0.jar:lib/com.sun.activation.jakarta.activation-1.2.1.jar:lib/org.apache.yetus.audience-annotations-0.5.0.jar:lib/com.fasterxml.jackson.dataformat.jackson-dataformat-yaml-2.10.2.jar:lib/io.micrometer.micrometer-core-1.3.1.jar:lib/io.netty.netty-codec-4.1.45.Final.jar:lib/org.keycloak.keycloak-common-10.0.0.jar:lib/jakarta.activation.jakarta.activation-api-1.2.1.jar:lib/io.vertx.vertx-core-3.8.5.jar:lib/io.strimzi.certificate-manager-0.18.0.jar:lib/io.strimzi.kafka-oauth-common-0.5.0.jar:lib/io.strimzi.kafka-oauth-server-0.5.0.jar:lib/io.netty.netty-codec-dns-4.1.45.Final.jar:lib/io.fabric8.kubernetes-model-4.6.4.jar:lib/io.netty.netty-codec-socks-4.1.45.Final.jar:lib/com.github.mifmif.generex-1.0.2.jar:lib/io.netty.netty-resolver-4.1.45.Final.jar:lib/com.github.luben.zstd-jni-1.4.4-7.jar:lib/io.netty.netty-handler-proxy-4.1.45.Final.jar:lib/com.squareup.okhttp3.logging-interceptor-3.12.6.jar:lib/io.strimzi.operator-common-0.18.0.jar:lib/org.bouncycastle.bcpkix-jdk15on-1.62.jar:lib/org.lz4.lz4-java-1.7.1.jar:lib/io.netty.netty-transport-native-epoll-4.1.45.Final-linux-x86_64.jar:lib/io.netty.netty-transport-native-unix-common-4.1.45.Final.jar:lib/dk.brics.automaton.automaton-1.11-8.jar:lib/io.vertx.vertx-micrometer-metrics-3.8.5.jar:lib/org.apache.kafka.kafka-clients-2.5.0.jar:lib/com.fasterxml.jackson.core.jackson-core-2.10.2.jar:lib/org.apache.zookeeper.zookeeper-jute-3.5.7.jar:lib/io.netty.netty-transport-4.1.45.Final.jar:lib/io.netty.netty-transport-native-epoll-4.1.45.Final.jar:lib/jakarta.xml.bind.jakarta.xml.bind-api-2.3.2.jar:lib/org.apache.logging.log4j.log4j-slf4j-impl-2.13.0.jar:lib/com.fasterxml.jackson.core.jackson-annotations-2.10.2.jar:lib/io.fabric8.zjsonpatch-0.3.0.jar:lib/org.apache.zookeeper.zookeeper-3.5.7.jar:lib/io.strimzi.api-0.18.0.jar:lib/io.fabric8.kubernetes-client-4.6.4.jar:lib/com.fasterxml.jackson.module.jackson-module-jaxb-annotations-2.10.2.jar:lib/com.squareup.okhttp3.okhttp-3.12.6.jar:lib/io.netty.netty-codec-http2-4.1.45.Final.jar:lib/io.strimzi.config-model-0.18.0.jar:lib/org.apache.logging.log4j.log4j-core-2.13.0.jar:lib/io.fabric8.kubernetes-model-common-4.6.4.jar:lib/com.fasterxml.jackson.core.jackson-databind-2.10.2.jar:lib/io.strimzi.crd-annotations-0.18.0.jar:lib/io.netty.netty-resolver-dns-4.1.45.Final.jar:lib/org.slf4j.slf4j-api-1.7.25.jar:lib/org.latencyutils.LatencyUtils-2.0.3.jar:lib/io.micrometer.micrometer-registry-prometheus-1.3.1.jar io.strimzi.operator.cluster.Main
2020-05-27 11:09:16 INFO Main:60 - ClusterOperator 0.18.0 is starting
2020-05-27 11:09:20 INFO Main:85 - Environment facts gathered: ClusterOperatorConfig(KubernetesVersion=1.11,OpenShiftRoutes=true,OpenShiftBuilds=true,OpenShiftImageStreams=true,OpenShiftDeploymentConfigs=true)
2020-05-27 11:09:20 INFO Util:237 - Using config:
PATH: /usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
STRIMZI_DEFAULT_KAFKA_BRIDGE_IMAGE: strimzi/kafka-bridge:0.16.0
TINI_SHA256: 12d20136605531b09a2c2dac02ccee85e1b874eb322ef6baf7561cd93f93c855
STRIMZI_KAFKA_MIRROR_MAKER_IMAGES: 2.4.0=strimzi/kafka:0.18.0-kafka-2.4.0
2.4.1=strimzi/kafka:0.18.0-kafka-2.4.1
2.5.0=strimzi/kafka:0.18.0-kafka-2.5.0
KUBERNETES_PORT: tcp://172.30.0.1:443
STRIMZI_DEFAULT_TOPIC_OPERATOR_IMAGE: strimzi/operator:0.18.0
STRIMZI_FULL_RECONCILIATION_INTERVAL_MS: 120000
KUBERNETES_SERVICE_HOST: 172.30.0.1
JAVA_CLASSPATH: lib/io.strimzi.cluster-operator-0.18.0.jar:lib/io.prometheus.simpleclient_common-0.7.0.jar:lib/io.strimzi.kafka-oauth-client-0.5.0.jar:lib/io.netty.netty-handler-4.1.45.Final.jar:lib/io.netty.netty-codec-http-4.1.45.Final.jar:lib/org.quartz-scheduler.quartz-2.2.1.jar:lib/org.bouncycastle.bcprov-jdk15on-1.60.jar:lib/com.squareup.okio.okio-1.15.0.jar:lib/org.keycloak.keycloak-core-10.0.0.jar:lib/io.netty.netty-buffer-4.1.45.Final.jar:lib/org.yaml.snakeyaml-1.24.jar:lib/io.fabric8.openshift-client-4.6.4.jar:lib/io.netty.netty-common-4.1.45.Final.jar:lib/org.apache.logging.log4j.log4j-api-2.13.0.jar:lib/org.xerial.snappy.snappy-java-1.1.7.3.jar:lib/org.hdrhistogram.HdrHistogram-2.1.11.jar:lib/io.prometheus.simpleclient-0.7.0.jar:lib/com.sun.activation.jakarta.activation-1.2.1.jar:lib/org.apache.yetus.audience-annotations-0.5.0.jar:lib/com.fasterxml.jackson.dataformat.jackson-dataformat-yaml-2.10.2.jar:lib/io.micrometer.micrometer-core-1.3.1.jar:lib/io.netty.netty-codec-4.1.45.Final.jar:lib/org.keycloak.keycloak-common-10.0.0.jar:lib/jakarta.activation.jakarta.activation-api-1.2.1.jar:lib/io.vertx.vertx-core-3.8.5.jar:lib/io.strimzi.certificate-manager-0.18.0.jar:lib/io.strimzi.kafka-oauth-common-0.5.0.jar:lib/io.strimzi.kafka-oauth-server-0.5.0.jar:lib/io.netty.netty-codec-dns-4.1.45.Final.jar:lib/io.fabric8.kubernetes-model-4.6.4.jar:lib/io.netty.netty-codec-socks-4.1.45.Final.jar:lib/com.github.mifmif.generex-1.0.2.jar:lib/io.netty.netty-resolver-4.1.45.Final.jar:lib/com.github.luben.zstd-jni-1.4.4-7.jar:lib/io.netty.netty-handler-proxy-4.1.45.Final.jar:lib/com.squareup.okhttp3.logging-interceptor-3.12.6.jar:lib/io.strimzi.operator-common-0.18.0.jar:lib/org.bouncycastle.bcpkix-jdk15on-1.62.jar:lib/org.lz4.lz4-java-1.7.1.jar:lib/io.netty.netty-transport-native-epoll-4.1.45.Final-linux-x86_64.jar:lib/io.netty.netty-transport-native-unix-common-4.1.45.Final.jar:lib/dk.brics.automaton.automaton-1.11-8.jar:lib/io.vertx.vertx-micrometer-metrics-3.8.5.jar:lib/org.apache.kafka.kafka-clients-2.5.0.jar:lib/com.fasterxml.jackson.core.jackson-core-2.10.2.jar:lib/org.apache.zookeeper.zookeeper-jute-3.5.7.jar:lib/io.netty.netty-transport-4.1.45.Final.jar:lib/io.netty.netty-transport-native-epoll-4.1.45.Final.jar:lib/jakarta.xml.bind.jakarta.xml.bind-api-2.3.2.jar:lib/org.apache.logging.log4j.log4j-slf4j-impl-2.13.0.jar:lib/com.fasterxml.jackson.core.jackson-annotations-2.10.2.jar:lib/io.fabric8.zjsonpatch-0.3.0.jar:lib/org.apache.zookeeper.zookeeper-3.5.7.jar:lib/io.strimzi.api-0.18.0.jar:lib/io.fabric8.kubernetes-client-4.6.4.jar:lib/com.fasterxml.jackson.module.jackson-module-jaxb-annotations-2.10.2.jar:lib/com.squareup.okhttp3.okhttp-3.12.6.jar:lib/io.netty.netty-codec-http2-4.1.45.Final.jar:lib/io.strimzi.config-model-0.18.0.jar:lib/org.apache.logging.log4j.log4j-core-2.13.0.jar:lib/io.fabric8.kubernetes-model-common-4.6.4.jar:lib/com.fasterxml.jackson.core.jackson-databind-2.10.2.jar:lib/io.strimzi.crd-annotations-0.18.0.jar:lib/io.netty.netty-resolver-dns-4.1.45.Final.jar:lib/org.slf4j.slf4j-api-1.7.25.jar:lib/org.latencyutils.LatencyUtils-2.0.3.jar:lib/io.micrometer.micrometer-registry-prometheus-1.3.1.jar
STRIMZI_KAFKA_CONNECT_S2I_IMAGES: 2.4.0=strimzi/kafka:0.18.0-kafka-2.4.0
2.4.1=strimzi/kafka:0.18.0-kafka-2.4.1
2.5.0=strimzi/kafka:0.18.0-kafka-2.5.0
STRIMZI_DEFAULT_TLS_SIDECAR_KAFKA_IMAGE: strimzi/kafka:0.18.0-kafka-2.5.0
TINI_VERSION: v0.18.0
STRIMZI_HOME: /opt/strimzi
STRIMZI_DEFAULT_TLS_SIDECAR_CRUISE_CONTROL_IMAGE: strimzi/kafka:0.18.0-kafka-2.5.0
PWD: /opt/strimzi
STRIMZI_DEFAULT_CRUISE_CONTROL_IMAGE: strimzi/kafka:0.18.0-kafka-2.5.0
KUBERNETES_PORT_443_TCP: tcp://172.30.0.1:443
STRIMZI_OPERATION_TIMEOUT_MS: 300000
JAVA_MAIN: io.strimzi.operator.cluster.Main
KUBERNETES_PORT_443_TCP_ADDR: 172.30.0.1
STRIMZI_VERSION: 0.18.0
STRIMZI_DEFAULT_JMXTRANS_IMAGE: strimzi/jmxtrans:0.18.0
STRIMZI_NAMESPACE: streaming
STRIMZI_KAFKA_MIRROR_MAKER_2_IMAGES: 2.4.0=strimzi/kafka:0.18.0-kafka-2.4.0
2.4.1=strimzi/kafka:0.18.0-kafka-2.4.1
2.5.0=strimzi/kafka:0.18.0-kafka-2.5.0
KUBERNETES_PORT_443_TCP_PROTO: tcp
STRIMZI_DEFAULT_USER_OPERATOR_IMAGE: strimzi/operator:0.18.0
KUBERNETES_SERVICE_PORT: 443
STRIMZI_DEFAULT_KAFKA_EXPORTER_IMAGE: strimzi/kafka:0.18.0-kafka-2.5.0
STRIMZI_DEFAULT_KAFKA_INIT_IMAGE: strimzi/operator:0.18.0
STRIMZI_LOG_LEVEL: INFO
HOSTNAME: strimzi-cluster-operator-bf978cf5c-rc5l2
STRIMZI_KAFKA_CONNECT_IMAGES: 2.4.0=strimzi/kafka:0.18.0-kafka-2.4.0
2.4.1=strimzi/kafka:0.18.0-kafka-2.4.1
2.5.0=strimzi/kafka:0.18.0-kafka-2.5.0
STRIMZI_KAFKA_IMAGES: 2.4.0=strimzi/kafka:0.18.0-kafka-2.4.0
2.4.1=strimzi/kafka:0.18.0-kafka-2.4.1
2.5.0=strimzi/kafka:0.18.0-kafka-2.5.0
KUBERNETES_PORT_443_TCP_PORT: 443
STRIMZI_DEFAULT_TLS_SIDECAR_ENTITY_OPERATOR_IMAGE: strimzi/kafka:0.18.0-kafka-2.5.0
KUBERNETES_SERVICE_PORT_HTTPS: 443
SHLVL: 0
HOME: /
MALLOC_ARENA_MAX: 2
2020-05-27 11:09:20 INFO ClusterOperator:87 - Creating ClusterOperator for namespace streaming
2020-05-27 11:09:20 INFO ClusterOperator:105 - Starting ClusterOperator for namespace streaming
2020-05-27 11:09:21 INFO ClusterOperator:119 - Opened watch for Kafka operator
2020-05-27 11:09:21 INFO ClusterOperator:119 - Opened watch for KafkaMirrorMaker operator
2020-05-27 11:09:21 INFO ClusterOperator:119 - Opened watch for KafkaConnect operator
2020-05-27 11:09:21 INFO ClusterOperator:119 - Opened watch for KafkaBridge operator
2020-05-27 11:09:21 INFO ClusterOperator:119 - Opened watch for KafkaMirrorMaker2 operator
2020-05-27 11:09:21 INFO ClusterOperator:119 - Opened watch for KafkaConnectS2I operator
2020-05-27 11:09:21 INFO ClusterOperator:130 - Setting up periodic reconciliation for namespace streaming
2020-05-27 11:09:22 INFO ClusterOperator:192 - ClusterOperator is now ready (health server listening on 8080)
2020-05-27 11:09:22 INFO Main:159 - Cluster Operator verticle started in namespace streaming
2020-05-27 11:11:21 INFO ClusterOperator:132 - Triggering periodic reconciliation for namespace streaming...
2020-05-27 11:11:21 WARN VersionUsageUtils:60 - The client is using resource type 'kafkas' with unstable version 'v1beta1'
2020-05-27 11:11:22 WARN VersionUsageUtils:60 - The client is using resource type 'kafkamirrormakers' with unstable version 'v1beta1'
2020-05-27 11:11:22 WARN VersionUsageUtils:60 - The client is using resource type 'kafkaconnects' with unstable version 'v1beta1'
2020-05-27 11:11:22 WARN VersionUsageUtils:60 - The client is using resource type 'kafkamirrormaker2s' with unstable version 'v1alpha1'
2020-05-27 11:11:22 WARN VersionUsageUtils:60 - The client is using resource type 'kafkabridges' with unstable version 'v1alpha1'
2020-05-27 11:11:22 WARN VersionUsageUtils:60 - The client is using resource type 'kafkarebalances' with unstable version 'v1alpha1'
2020-05-27 11:11:22 WARN VersionUsageUtils:60 - The client is using resource type 'kafkaconnects2is' with unstable version 'v1beta1'
2020-05-27 11:12:27 INFO OperatorWatcher:40 - Reconciliation #0(watch) Kafka(streaming/logs): Kafka logs in namespace streaming was ADDED
2020-05-27 11:12:27 INFO AbstractOperator:173 - Reconciliation #0(watch) Kafka(streaming/logs): Kafka logs should be created or updated
2020-05-27 11:12:28 INFO OperatorWatcher:40 - Reconciliation #1(watch) Kafka(streaming/logs): Kafka logs in namespace streaming was MODIFIED
2020-05-27 11:12:31 WARN VersionUsageUtils:60 - The client is using resource type 'poddisruptionbudgets' with unstable version 'v1beta1'
2020-05-27 11:12:38 WARN AbstractOperator:247 - Reconciliation #1(watch) Kafka(streaming/logs): Failed to acquire lock lock::streaming::Kafka::logs within 10000ms.
2020-05-27 11:13:21 INFO ClusterOperator:132 - Triggering periodic reconciliation for namespace streaming...
2020-05-27 11:13:31 WARN AbstractOperator:247 - Reconciliation #2(timer) Kafka(streaming/logs): Failed to acquire lock lock::streaming::Kafka::logs within 10000ms.
2020-05-27 11:14:20 INFO OperatorWatcher:40 - Reconciliation #3(watch) Kafka(streaming/logs): Kafka logs in namespace streaming was MODIFIED
2020-05-27 11:14:20 INFO AbstractOperator:318 - Reconciliation #0(watch) Kafka(streaming/logs): reconciled
2020-05-27 11:14:20 INFO AbstractOperator:173 - Reconciliation #3(watch) Kafka(streaming/logs): Kafka logs should be created or updated
2020-05-27 11:14:23 INFO AbstractOperator:318 - Reconciliation #3(watch) Kafka(streaming/logs): reconciled
After the operator does its work, you have the cluster at your disposal. To check the Kafka resources in minishift:
~ oc get kafka
NAME DESIRED KAFKA REPLICAS DESIRED ZK REPLICAS
logs 1 1
If you remove the Kafka resource, Strimzi will delete all dependent objects for you. This is very important, because if you delete single objects like a stateful set of Apache ZooKeeper, Strimzi reconciles and will auto-create it again. The only way to remove all created resources is to eliminate the Kafka resource.
Do not do this yet, if you want to follow the example!
~ oc delete kafka logs
kafka.kafka.strimzi.io "logs" deleted
Stateful Sets
Strimzi creates stateful sets for our Kafka cluster. What is a StatefulSet?
StatefulSet is the workload API object used to manage stateful applications. It controls the deployment and scaling of a set of Pods and provides guarantees about the ordering and uniqueness of these Pods. A Kafka broker and ZooKeeper node must be unique. This is mandatory for clusters that aim high availability.
The name of the Kafka cluster is the prefix for the following stateful sets.
oc get statefulset
NAME DESIRED CURRENT AGE
logs-kafka 1 1 15m
logs-zookeeper 1 1 16m
Deployments
The Strimzi operator also deploys operators itself.
~ oc get deployment
NAME DESIRED CURRENT UP-TO-DATE AVAILABLE AGE
logs-entity-operator 1 1 1 1 15m
strimzi-cluster-operator 1 1 1 1 56m
In this case, it is the Entity Operator.
The Entity Operator comprises the Topic Operator and User Operator.
- The Topic Operator manages Kafka topics.
- The User Operator manages Kafka users.
These operators listen to custom resources. In the case of the Topic Operator, it is KafkaTopic
.
We create a Kafka topic.
cat mimacom-topic.yml
:
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaTopic
metadata:
name: spring-boot
labels:
strimzi.io/cluster: logs
spec:
partitions: 1
replicas: 1
Since it is a single node, we use 1 partition and 1 replica. We create the KafkaTopic.
oc apply -f mimacom-topic.yml
kafkatopic.kafka.strimzi.io/spring-boot created
List topic (the OpenShift/Strimzi way)
oc get kafkatopic
NAME PARTITIONS REPLICATION FACTOR
spring-boot 1 1
Kafka topics are managed differently in OpenShift and Kubernetes. In a native Kafka installation, you will use the kafka-topics.sh to create a topic, or if auto-create is enabled, Kafka creates the topic itself. These topics will not be visible to Strimzi, so using the entity operator is vital for managing Kafka topics on OpenShift!
Networking
Strimzi creates these routes and exposes the cluster for outside Access.
oc get routes
NAME HOST/PORT PATH SERVICES PORT TERMINATION WILDCARD
logs-kafka-0 logs-kafka-0-streaming.192.168.42.24.nip.io logs-kafka-0 9094 passthrough None
logs-kafka-bootstrap logs-kafka-bootstrap-streaming.192.168.42.24.nip.io logs-kafka-external-bootstrap 9094 passthrough None
Other Objects
If you want to see all resources that Strimzi created, list all objects.
oc get all
NAME READY STATUS RESTARTS AGE
pod/logs-entity-operator-c8ff8696f-8x6vk 3/3 Running 0 43m
pod/logs-kafka-0 2/2 Running 0 43m
pod/logs-zookeeper-0 1/1 Running 0 44m
pod/strimzi-cluster-operator-bf978cf5c-rc5l2 1/1 Running 0 2h
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/logs-kafka-0 ClusterIP 172.30.204.40 <none> 9094/TCP 43m
service/logs-kafka-bootstrap ClusterIP 172.30.99.158 <none> 9091/TCP,9092/TCP,9093/TCP 43m
service/logs-kafka-brokers ClusterIP None <none> 9091/TCP,9092/TCP,9093/TCP 43m
service/logs-kafka-external-bootstrap ClusterIP 172.30.39.229 <none> 9094/TCP 43m
service/logs-zookeeper-client ClusterIP 172.30.142.59 <none> 2181/TCP 44m
service/logs-zookeeper-nodes ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 44m
NAME DESIRED CURRENT UP-TO-DATE AVAILABLE AGE
deployment.apps/logs-entity-operator 1 1 1 1 43m
deployment.apps/strimzi-cluster-operator 1 1 1 1 2h
NAME DESIRED CURRENT READY AGE
replicaset.apps/logs-entity-operator-c8ff8696f 1 1 1 43m
replicaset.apps/strimzi-cluster-operator-bf978cf5c 1 1 1 2h
NAME DESIRED CURRENT AGE
statefulset.apps/logs-kafka 1 1 43m
statefulset.apps/logs-zookeeper 1 1 44m
NAME HOST/PORT PATH SERVICES PORT TERMINATION WILDCARD
route.route.openshift.io/logs-kafka-0 logs-kafka-0-streaming.192.168.42.24.nip.io logs-kafka-0 9094 passthrough None
route.route.openshift.io/logs-kafka-bootstrap logs-kafka-bootstrap-streaming.192.168.42.24.nip.io logs-kafka-external-bootstrap 9094 passthrough None
Using Kafka on OpenShift
We use some test data for time tracking. This data is randomly generated.
[{
"id": 1,
"first_name": "Redd",
"last_name": "Ulyat",
"email": "rulyat0@ft.com",
"gender": "Male",
"start": "18:06",
"end": "17:50",
"customer": "Feedspan"
}, {
"id": 2,
"first_name": "Suzann",
"last_name": "Andryushchenko",
"email": "sandryushchenko1@cyberchimps.com",
"gender": "Female",
"start": "12:14",
"end": "20:53",
"customer": "Skyndu"
}, {
"id": 3,
"first_name": "Sunny",
"last_name": "Statter",
"email": "sstatter2@purevolume.com",
"gender": "Male",
"start": "9:46",
"end": "14:29",
"customer": "Pixope"
}, {
"id": 4,
"first_name": "Glori",
"last_name": "Cumberpatch",
"email": "gcumberpatch3@dedecms.com",
"gender": "Female",
"start": "18:13",
"end": "11:29",
"customer": "Agimba"
}, {
"id": 5,
"first_name": "Ashlee",
"last_name": "Fynan",
"email": "afynan4@uiuc.edu",
"gender": "Female",
"start": "10:01",
"end": "1:53",
"customer": "Gigabox"
}, {
"id": 6,
"first_name": "Darin",
"last_name": "Gulc",
"email": "dgulc5@ihg.com",
"gender": "Male",
"start": "9:43",
"end": "5:10",
"customer": "Rhyloo"
}, {
"id": 7,
"first_name": "Brodie",
"last_name": "Casari",
"email": "bcasari6@mashable.com",
"gender": "Male",
"start": "5:22",
"end": "12:00",
"customer": "Eamia"
}, {
"id": 8,
"first_name": "Morty",
"last_name": "Currington",
"email": "mcurrington7@usa.gov",
"gender": "Male",
"start": "21:22",
"end": "17:01",
"customer": "Quatz"
}, {
"id": 9,
"first_name": "Jacobo",
"last_name": "Lanmeid",
"email": "jlanmeid8@illinois.edu",
"gender": "Male",
"start": "18:10",
"end": "18:39",
"customer": "Buzzster"
}, {
"id": 10,
"first_name": "Ilario",
"last_name": "Embra",
"email": "iembra9@ca.gov",
"gender": "Male",
"start": "13:51",
"end": "18:54",
"customer": "Oyondu"
}]
Producing Data
We start a pod with a container for producing data. The container is clean up after termination. Do this in your Linux terminal.
oc -n streaming run kafka-producer -ti --image=strimzi/kafka:0.18.0-kafka-2.5.0 \
--rm=true --restart=Never -- bin/kafka-console-producer.sh \
--broker-list logs-kafka-bootstrap:9092 --topic time-tracking
Paste the minified test-data into it. CTRL + C closes the producer.
If you don't see a command prompt, try pressing enter.
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
>[{"id":1,"first_name":"Redd","last_name":"Ulyat","email":"rulyat0@ft.com","gender":"Male","start":"18:06","end":"17:50","customer":"Feedspan"},{"id":2,"first_name":"Suzann","last_name":"Andryushchenko","email":"sandryushchenko1@cyberchimps.com","gender":"Female","start":"12:14","end":"20:53","customer":"Skyndu"},{"id":3,"first_name":"Sunny","last_name":"Statter","email":"sstatter2@purevolume.com","gender":"Male","start":"9:46","end":"14:29","customer":"Pixope"},{"id":4,"first_name":"Glori","last_name":"Cumberpatch","email":"gcumberpatch3@dedecms.com","gender":"Female","start":"18:13","end":"11:29","customer":"Agimba"},{"id":5,"first_name":"Ashlee","last_name":"Fynan","email":"afynan4@uiuc.edu","gender":"Female","start":"10:01","end":"1:53","customer":"Gigabox"},{"id":6,"first_name":"Darin","last_name":"Gulc","email":"dgulc5@ihg.com","gender":"Male","start":"9:43","end":"5:10","customer":"Rhyloo"},{"id":7,"first_name":"Brodie","last_name":"Casari","email":"bcasari6@mashable.com","gender":"Male","start":"5:22","end":"12:00","customer":"Eamia"},{"id":8,"first_name":"Morty","last_name":"Currington","email":"mcurrington7@usa.gov","gender":"Male","start":"21:22","end":"17:01","customer":"Quatz"},{"id":9,"first_name":"Jacobo","last_name":"Lanmeid","email":"jlanmeid8@illinois.edu","gender":"Male","start":"18:10","end":"18:39","customer":"Buzzster"},{"id":10,"first_name":"Ilario","last_name":"Embra","email":"iembra9@ca.gov","gender":"Male","start":"13:51","end":"18:54","customer":"Oyondu"}]
[2020-05-27 14:40:37,082] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 11 : {time-tracking=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2020-05-27 14:40:37,195] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 12 : {time-tracking=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
>^Cpod "kafka-producer" deleted
pod streaming/kafka-producer terminated (Error)
The container could communicate within the namespace with the Kafka cluster over the service name logs-kafka-bootstrap
on port 9092
.
Consuming Data
Consuming data follows the same principle. To get the data from the start, pass the option --from-beginning
.
oc -n streaming run kafka-consumer -ti --image=strimzi/kafka:0.18.0-kafka-2.5.0 \
--rm=true --restart=Never -- bin/kafka-console-consumer.sh \
--bootstrap-server logs-kafka-bootstrap:9092 --topic time-tracking --from-beginning
You will receive this output. CTRL + C closes the application.
If you don't see a command prompt, try pressing enter.
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
[{"id":1,"first_name":"Redd","last_name":"Ulyat","email":"rulyat0@ft.com","gender":"Male","start":"18:06","end":"17:50","customer":"Feedspan"},{"id":2,"first_name":"Suzann","last_name":"Andryushchenko","email":"sandryushchenko1@cyberchimps.com","gender":"Female","start":"12:14","end":"20:53","customer":"Skyndu"},{"id":3,"first_name":"Sunny","last_name":"Statter","email":"sstatter2@purevolume.com","gender":"Male","start":"9:46","end":"14:29","customer":"Pixope"},{"id":4,"first_name":"Glori","last_name":"Cumberpatch","email":"gcumberpatch3@dedecms.com","gender":"Female","start":"18:13","end":"11:29","customer":"Agimba"},{"id":5,"first_name":"Ashlee","last_name":"Fynan","email":"afynan4@uiuc.edu","gender":"Female","start":"10:01","end":"1:53","customer":"Gigabox"},{"id":6,"first_name":"Darin","last_name":"Gulc","email":"dgulc5@ihg.com","gender":"Male","start":"9:43","end":"5:10","customer":"Rhyloo"},{"id":7,"first_name":"Brodie","last_name":"Casari","email":"bcasari6@mashable.com","gender":"Male","start":"5:22","end":"12:00","customer":"Eamia"},{"id":8,"first_name":"Morty","last_name":"Currington","email":"mcurrington7@usa.gov","gender":"Male","start":"21:22","end":"17:01","customer":"Quatz"},{"id":9,"first_name":"Jacobo","last_name":"Lanmeid","email":"jlanmeid8@illinois.edu","gender":"Male","start":"18:10","end":"18:39","customer":"Buzzster"},{"id":10,"first_name":"Ilario","last_name":"Embra","email":"iembra9@ca.gov","gender":"Male","start":"13:51","end":"18:54","customer":"Oyondu"}]
^CProcessed a total of 2 messages
pod "kafka-consumer" deleted
pod streaming/kafka-consumer terminated (Error)
External Access of Kafka
To access the cluster from a local Kafka installation, we have to get a certificate since the route is https encrypted. This is another strength of Strimzi. It comes easily without the need for us to do it.
For demonstration purpose, we use an older client version 2.4.1 indifference to the deployed version 2.5.0.
Show all secrets and list certificate authorities (ca).
oc get secrets | grep ca-cert
logs-clients-ca-cert Opaque 3 1h
logs-cluster-ca-cert Opaque 3 1h
We are only interested in the cluster ca. To view, it uses this command. The sensitive output is redacted.
oc get secret logs-cluster-ca-cert -o yaml
apiVersion: v1
data:
ca.crt: LS0tLS1...
ca.p12: MIIEVgI...
ca.password: xxxx
kind: Secret
metadata:
annotations:
strimzi.io/ca-cert-generation: "0"
creationTimestamp: 2020-05-27T12:55:29Z
labels:
app.kubernetes.io/instance: logs
app.kubernetes.io/managed-by: strimzi-cluster-operator
app.kubernetes.io/name: strimzi
app.kubernetes.io/part-of: strimzi-logs
strimzi.io/cluster: logs
strimzi.io/kind: Kafka
strimzi.io/name: strimzi
name: logs-cluster-ca-cert
namespace: streaming
ownerReferences:
- apiVersion: kafka.strimzi.io/v1beta1
blockOwnerDeletion: false
controller: false
kind: Kafka
name: logs
uid: 5783c877-a019-11ea-9be2-525400a45a70
resourceVersion: "147848"
selfLink: /api/v1/namespaces/streaming/secrets/logs-cluster-ca-cert
uid: 57fa7398-a019-11ea-9be2-525400a45a70
type: Opaque
Export it to a newly created Java truststore.
oc extract secret/logs-cluster-ca-cert --keys=ca.crt --to=- > ca.crt
keytool -import -trustcacerts -alias root -file ca.crt -keystore truststore.jks -storepass password -noprompt
# ca.crt
Certificate was added to keystore
Consuming Data
Go to your local Kafka installation and pass the mandatory information. We use the bootstrap address logs-kafka-bootstrap-streaming.192.168.42.24.nip.io
to access the Kafka cluster.
bin/kafka-console-consumer.sh \
--bootstrap-server logs-kafka-bootstrap-streaming.192.168.42.24.nip.io:443 \
--consumer-property security.protocol=SSL \
--consumer-property ssl.truststore.password=password \
--consumer-property ssl.truststore.location=/home/tan/truststore.jks \
--topic time-tracking --from-beginning
The client output:
[{"id":1,"first_name":"Redd","last_name":"Ulyat","email":"rulyat0@ft.com","gender":"Male","start":"18:06","end":"17:50","customer":"Feedspan"},{"id":2,"first_name":"Suzann","last_name":"Andryushchenko","email":"sandryushchenko1@cyberchimps.com","gender":"Female","start":"12:14","end":"20:53","customer":"Skyndu"},{"id":3,"first_name":"Sunny","last_name":"Statter","email":"sstatter2@purevolume.com","gender":"Male","start":"9:46","end":"14:29","customer":"Pixope"},{"id":4,"first_name":"Glori","last_name":"Cumberpatch","email":"gcumberpatch3@dedecms.com","gender":"Female","start":"18:13","end":"11:29","customer":"Agimba"},{"id":5,"first_name":"Ashlee","last_name":"Fynan","email":"afynan4@uiuc.edu","gender":"Female","start":"10:01","end":"1:53","customer":"Gigabox"},{"id":6,"first_name":"Darin","last_name":"Gulc","email":"dgulc5@ihg.com","gender":"Male","start":"9:43","end":"5:10","customer":"Rhyloo"},{"id":7,"first_name":"Brodie","last_name":"Casari","email":"bcasari6@mashable.com","gender":"Male","start":"5:22","end":"12:00","customer":"Eamia"},{"id":8,"first_name":"Morty","last_name":"Currington","email":"mcurrington7@usa.gov","gender":"Male","start":"21:22","end":"17:01","customer":"Quatz"},{"id":9,"first_name":"Jacobo","last_name":"Lanmeid","email":"jlanmeid8@illinois.edu","gender":"Male","start":"18:10","end":"18:39","customer":"Buzzster"},{"id":10,"first_name":"Ilario","last_name":"Embra","email":"iembra9@ca.gov","gender":"Male","start":"13:51","end":"18:54","customer":"Oyondu"}]
^CProcessed a total of 2 messages
Producing data follows the same principle. You can pass all the properties or create consumer.properties
or producer.properties
for respective usage. We demonstrate it in the next section.
List Topics
As you have read, handling topics should go over the entity operator. The created example topic time-tracking
, it is not
listed in OpenShift. You can use kafka-topics.sh
to view the topics from outside. We need to create a properties file
for the client with the settings for encrypted communication.
Create properties.file
~/test/kafka_2.11-2.4.1 echo security.protocol=SSL >> strimzi-minishift.properties
~/test/kafka_2.11-2.4.1 echo ssl.truststore.password=password >> strimzi-minishift.properties
~/test/kafka_2.11-2.4.1 echo ssl.truststore.location=/home/tan/truststore.jks >> strimzi-minishift.properties
List all topics
bin/kafka-topics.sh --bootstrap-server logs-kafka-bootstrap-streaming.192.168.42.24.nip.io:443 \
--list --command-config ./strimzi-minishift.properties
The output
~/test/kafka_2.11-2.4.1 bin/kafka-topics.sh --bootstrap-server logs-kafka-bootstrap-streaming.192.168.42.24.nip.io:443 --list --command-config ./strimzi-minishift.properties
__consumer_offsets
spring-boot
time-tracking
Summary
In this article, you have learned the basics about using Strimzi to deploy Apache Kafka on Kubernetes and OpenShift. External Access from outside was a crucial desire of us, and Strimzi comes with robust solutions. You can test this example hands-on yourself. All you need to do is to install minishift and Apache Kafka on your Linux environment.