External Access of Apache Kafka on OpenShift

June 2, 2020

Containers are a solution to the problem of how to get the software to run when moved from one computing environment to another. Especially with Docker, as container solution, started a new DevOps trend. We, as software engineers, leverage this technology to ship our software to any environment.

With the container technology, Kubernetes (K8S) became the de-facto industry standard for container orchestration. K8S is an open-source system for automating deployment, scaling, and management of containerized applications. RedHat reshaped OpenShift with an enterprise K8S, the OpenShift Container Platform. OpenShift is a container platform that runs your software on top of K8S and other additional concepts.

A lot of our software architectures rely on Apache Kafka. Apache Kafka is a distributed messaging system. How do we install Kafka on OpenShift? If it is on OpenShift, how do we access the data? The answer to these questions is Strimzi.

Strimzi is an open-source project that provides container images and operators for running Apache Kafka on Kubernetes and Red Hat OpenShift.

Motivation

Who is that article for?

This article will explain how Strimzi makes Kafka accessible for clients running outside of Kubernetes. We will deploy a Kafka cluster with the Strimzi Operator on minishift (OKD/OpenShift). This is for all (DevOps) engineers who need a concise example or wrap-up how Strimzi works.

What basic knowledge do I need to understand this article?

It is not required to have all necessary knowledge, but it helps tremendously if you are knowledgeable with the concepts of

What you will learn about, in this article:

Credits

This article is base upon the work of Accessing Apache Kafka in Strimzi: Part 1 and Accessing Apache Kafka in Strimzi: Part 3.

Differentiation

Apache Kafka is Open Source, the Confluent Platform is a professional community and enterprise offering for Apache Kafka. Strimzi brings Apache Kafka on Kubernetes and OpenShift. So we won't deal with the Confluent Platform in this article. mimacom is also Confluent Partner and can provide assistance in this matter.

Benefits of Kafka on Kubernetes

Apache Kafka is an essential data infrastructure backbone. Kafka, as a distributed system, promises high availability and decoupling of system dependencies. Producers (data supplier) and Consumers can process the data in their own speed. Kafka, therefore, is installed and deployed on several machines.

Kubernetes is a system for deploying applications, that can save money because it takes less IT personnel to manage. It also makes your apps a lot more portable, so you can move them more easily between different clouds and internal environments. The key is automation, the price to pay is the complexity and further education of your DevOps Engineers.

Having Kafka on Kubernetes, allows Kubernetes to deploy Kafka on available resources with the guarantee that the Kafka cluster never face disruption or downtime.

Do I need Kafka on K8S? It depends on your scenario and environment. IMHO if you have no experience with Apache Kafka at all, learn the product and basics first, before operating it on K8S.

The Environment

To understand our example, we provide the environment details for this article.

We use a Linux system for demonstration.

Minishift

Minishift is a tool that helps you run OpenShift locally by running a single-node OpenShift cluster inside a VM. For further information about installation, refer to the product page. It is a robust solution to test OpenShift concepts locally before going to the production cluster.

We start minishift with this command on a Linux terminal:

minishift start

The output should be like this: minishift start

With a start, we need to add the oc client to the PATH.

Simply run this command.

eval $(minishift oc-env)

Query the OpenShift/OKD version to test if the oc client is working.

oc version
oc v3.11.0+0cbc58b
kubernetes v1.11.0+d4cacc0
features: Basic-Auth GSSAPI Kerberos SPNEGO

Server https://192.168.42.24:8443
kubernetes v1.11.0+d4cacc0

We have now our infrastructure running. OpenShift is an enterprise Kubernetes offering by RedHat. The Community Distribution of Kubernetes that powers Red Hat OpenShift is OKD.

Installing the Strimzi Operator

This section covers how we install the Strimzi Operator on minishift.

Login as admin into minishift.

oc login -u system:admin
Logged into "https://192.168.42.24:8443" as "system:admin" using existing credentials.

You have access to the following projects and can switch between them with 'oc project <projectname>':

  * default
    kube-dns
    kube-proxy
    kube-public
    kube-system
    openshift
    openshift-apiserver
    openshift-controller-manager
    openshift-core-operators
    openshift-infra
    openshift-node
    openshift-service-cert-signer
    openshift-web-console

Using project "default".

Create a new project with the name streaming.

oc new-project streaming
Now using project "streaming" on server "https://192.168.42.24:8443".

You can add applications to this project with the 'new-app' command. For example, try:

    oc new-app centos/ruby-25-centos7~https://github.com/sclorg/ruby-ex.git

to build a new example application in Ruby.

Install the Strimzi Operator (namespace option is not needed but for completeness) we declare it.

oc apply -f 'https://strimzi.io/install/latest?namespace=streaming' -n streaming

The minishift output:

customresourcedefinition.apiextensions.k8s.io/kafkas.kafka.strimzi.io created
rolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator-entity-operator-delegation created
clusterrolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator created
rolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator-topic-operator-delegation created
customresourcedefinition.apiextensions.k8s.io/kafkausers.kafka.strimzi.io created
customresourcedefinition.apiextensions.k8s.io/kafkarebalances.kafka.strimzi.io created
customresourcedefinition.apiextensions.k8s.io/kafkamirrormaker2s.kafka.strimzi.io created
clusterrole.rbac.authorization.k8s.io/strimzi-entity-operator created
clusterrole.rbac.authorization.k8s.io/strimzi-cluster-operator-global created
clusterrolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator-kafka-broker-delegation created
rolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator created
clusterrole.rbac.authorization.k8s.io/strimzi-cluster-operator-namespaced created
clusterrole.rbac.authorization.k8s.io/strimzi-topic-operator created
serviceaccount/strimzi-cluster-operator created
clusterrole.rbac.authorization.k8s.io/strimzi-kafka-broker created
customresourcedefinition.apiextensions.k8s.io/kafkatopics.kafka.strimzi.io created
customresourcedefinition.apiextensions.k8s.io/kafkabridges.kafka.strimzi.io created
deployment.apps/strimzi-cluster-operator created
customresourcedefinition.apiextensions.k8s.io/kafkaconnectors.kafka.strimzi.io created
customresourcedefinition.apiextensions.k8s.io/kafkaconnects2is.kafka.strimzi.io created
customresourcedefinition.apiextensions.k8s.io/kafkaconnects.kafka.strimzi.io created
customresourcedefinition.apiextensions.k8s.io/kafkamirrormakers.kafka.strimzi.io created

If you can not access the strimzi page, add a proxy or check out the GitHub Repo.

The Operator Concept

Operators are software extensions to Kubernetes that make use of custom resources to manage applications and their components. As you see in the above installation output, Strimzi creates custom resources.

The Operator pattern aims to capture the key aim of a human operator who is managing a service or set of services. Human operators who look after specific applications and services have deep knowledge of how the system ought to behave, how to deploy it, and how to react if there are problems.

An Operator is essentially a custom controller. The Strimzi Operator has in-depth knowledge about Apache Kafka clusters.

An Operator watches for these custom resource types and is notified about their presence or modification. With the custom resource kafkas.kafka.strimzi.io you describe the Kafka cluster and the Strimzi operator deploys the Kafka cluster for you in an automated way.

The Kafka Resource

When the operator receives this notification, it will start running a loop to ensure that all the required objects for the application service are actually available and configured in the object's specification by the user.

We take from Strimzi the example for a Kafka cluster specification and create a custom one for our testing scenario.

curl -s https://strimzi.io/examples/latest/kafka/kafka-persistent-single.yaml \
 | sed 's/name: .*/name: logs/' \
 | sed 's/size: .*/size: 100Mi/' \
 | cat 

Above command takes the example file and replace the cluster name to logs and we only requests 100 MegaByte instead of GigaByte for our testing scenario. You see on your terminal following output:

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: logs
spec:
  kafka:
    version: 2.5.0
    replicas: 1
    listeners:
      plain: {}
      tls: {}
    config:
      offsets.topic.replication.factor: 1
      transaction.state.log.replication.factor: 1
      transaction.state.log.min.isr: 1
      log.message.format.version: "2.5"
    storage:
      type: jbod
      volumes:
      - id: 0
        type: persistent-claim
        size: 100Mi
        deleteClaim: false
  zookeeper:
    replicas: 1
    storage:
      type: persistent-claim
      size: 100Mi
      deleteClaim: false
  entityOperator:
    topicOperator: {}
    userOperator: {}

We have a sufficient description of our Kafka cluster. It is just semantics.

Now that we have the description of our desired cluster, we deploy it in the created namespace streaming in minishift.

curl -s https://strimzi.io/examples/latest/kafka/kafka-persistent-single.yaml \
 | sed 's/name: .*/name: logs/' \
 | sed 's/size: .*/size: 100Mi/' \
 | oc -n streaming apply -f -

minishift will yield:

kafka.kafka.strimzi.io/logs created

Strimzi Operator in Action

The Strimzi operator creates all necessary objects to fulfil the description. The operator itself is in a pod. Showing the logs of the operator, you know what Strimzi does for you.

oc logs -f strimzi-cluster-operator-bf978cf5c-rc5l2
+ shift
+ export MALLOC_ARENA_MAX=2
+ MALLOC_ARENA_MAX=2
+ JAVA_OPTS=' -Dvertx.cacheDirBase=/tmp -Djava.security.egd=file:/dev/./urandom'
++ get_gc_opts
++ '[' '' == true ']'
++ echo ''
+ JAVA_OPTS=' -Dvertx.cacheDirBase=/tmp -Djava.security.egd=file:/dev/./urandom '
+ exec /usr/bin/tini -w -e 143 -- java -Dvertx.cacheDirBase=/tmp -Djava.security.egd=file:/dev/./urandom -classpath lib/io.strimzi.cluster-operator-0.18.0.jar:lib/io.prometheus.simpleclient_common-0.7.0.jar:lib/io.strimzi.kafka-oauth-client-0.5.0.jar:lib/io.netty.netty-handler-4.1.45.Final.jar:lib/io.netty.netty-codec-http-4.1.45.Final.jar:lib/org.quartz-scheduler.quartz-2.2.1.jar:lib/org.bouncycastle.bcprov-jdk15on-1.60.jar:lib/com.squareup.okio.okio-1.15.0.jar:lib/org.keycloak.keycloak-core-10.0.0.jar:lib/io.netty.netty-buffer-4.1.45.Final.jar:lib/org.yaml.snakeyaml-1.24.jar:lib/io.fabric8.openshift-client-4.6.4.jar:lib/io.netty.netty-common-4.1.45.Final.jar:lib/org.apache.logging.log4j.log4j-api-2.13.0.jar:lib/org.xerial.snappy.snappy-java-1.1.7.3.jar:lib/org.hdrhistogram.HdrHistogram-2.1.11.jar:lib/io.prometheus.simpleclient-0.7.0.jar:lib/com.sun.activation.jakarta.activation-1.2.1.jar:lib/org.apache.yetus.audience-annotations-0.5.0.jar:lib/com.fasterxml.jackson.dataformat.jackson-dataformat-yaml-2.10.2.jar:lib/io.micrometer.micrometer-core-1.3.1.jar:lib/io.netty.netty-codec-4.1.45.Final.jar:lib/org.keycloak.keycloak-common-10.0.0.jar:lib/jakarta.activation.jakarta.activation-api-1.2.1.jar:lib/io.vertx.vertx-core-3.8.5.jar:lib/io.strimzi.certificate-manager-0.18.0.jar:lib/io.strimzi.kafka-oauth-common-0.5.0.jar:lib/io.strimzi.kafka-oauth-server-0.5.0.jar:lib/io.netty.netty-codec-dns-4.1.45.Final.jar:lib/io.fabric8.kubernetes-model-4.6.4.jar:lib/io.netty.netty-codec-socks-4.1.45.Final.jar:lib/com.github.mifmif.generex-1.0.2.jar:lib/io.netty.netty-resolver-4.1.45.Final.jar:lib/com.github.luben.zstd-jni-1.4.4-7.jar:lib/io.netty.netty-handler-proxy-4.1.45.Final.jar:lib/com.squareup.okhttp3.logging-interceptor-3.12.6.jar:lib/io.strimzi.operator-common-0.18.0.jar:lib/org.bouncycastle.bcpkix-jdk15on-1.62.jar:lib/org.lz4.lz4-java-1.7.1.jar:lib/io.netty.netty-transport-native-epoll-4.1.45.Final-linux-x86_64.jar:lib/io.netty.netty-transport-native-unix-common-4.1.45.Final.jar:lib/dk.brics.automaton.automaton-1.11-8.jar:lib/io.vertx.vertx-micrometer-metrics-3.8.5.jar:lib/org.apache.kafka.kafka-clients-2.5.0.jar:lib/com.fasterxml.jackson.core.jackson-core-2.10.2.jar:lib/org.apache.zookeeper.zookeeper-jute-3.5.7.jar:lib/io.netty.netty-transport-4.1.45.Final.jar:lib/io.netty.netty-transport-native-epoll-4.1.45.Final.jar:lib/jakarta.xml.bind.jakarta.xml.bind-api-2.3.2.jar:lib/org.apache.logging.log4j.log4j-slf4j-impl-2.13.0.jar:lib/com.fasterxml.jackson.core.jackson-annotations-2.10.2.jar:lib/io.fabric8.zjsonpatch-0.3.0.jar:lib/org.apache.zookeeper.zookeeper-3.5.7.jar:lib/io.strimzi.api-0.18.0.jar:lib/io.fabric8.kubernetes-client-4.6.4.jar:lib/com.fasterxml.jackson.module.jackson-module-jaxb-annotations-2.10.2.jar:lib/com.squareup.okhttp3.okhttp-3.12.6.jar:lib/io.netty.netty-codec-http2-4.1.45.Final.jar:lib/io.strimzi.config-model-0.18.0.jar:lib/org.apache.logging.log4j.log4j-core-2.13.0.jar:lib/io.fabric8.kubernetes-model-common-4.6.4.jar:lib/com.fasterxml.jackson.core.jackson-databind-2.10.2.jar:lib/io.strimzi.crd-annotations-0.18.0.jar:lib/io.netty.netty-resolver-dns-4.1.45.Final.jar:lib/org.slf4j.slf4j-api-1.7.25.jar:lib/org.latencyutils.LatencyUtils-2.0.3.jar:lib/io.micrometer.micrometer-registry-prometheus-1.3.1.jar io.strimzi.operator.cluster.Main
2020-05-27 11:09:16 INFO  Main:60 - ClusterOperator 0.18.0 is starting
2020-05-27 11:09:20 INFO  Main:85 - Environment facts gathered: ClusterOperatorConfig(KubernetesVersion=1.11,OpenShiftRoutes=true,OpenShiftBuilds=true,OpenShiftImageStreams=true,OpenShiftDeploymentConfigs=true)
2020-05-27 11:09:20 INFO  Util:237 - Using config:
   PATH: /usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
   STRIMZI_DEFAULT_KAFKA_BRIDGE_IMAGE: strimzi/kafka-bridge:0.16.0
   TINI_SHA256: 12d20136605531b09a2c2dac02ccee85e1b874eb322ef6baf7561cd93f93c855
   STRIMZI_KAFKA_MIRROR_MAKER_IMAGES: 2.4.0=strimzi/kafka:0.18.0-kafka-2.4.0
2.4.1=strimzi/kafka:0.18.0-kafka-2.4.1
2.5.0=strimzi/kafka:0.18.0-kafka-2.5.0
   KUBERNETES_PORT: tcp://172.30.0.1:443
   STRIMZI_DEFAULT_TOPIC_OPERATOR_IMAGE: strimzi/operator:0.18.0
   STRIMZI_FULL_RECONCILIATION_INTERVAL_MS: 120000
   KUBERNETES_SERVICE_HOST: 172.30.0.1
   JAVA_CLASSPATH: lib/io.strimzi.cluster-operator-0.18.0.jar:lib/io.prometheus.simpleclient_common-0.7.0.jar:lib/io.strimzi.kafka-oauth-client-0.5.0.jar:lib/io.netty.netty-handler-4.1.45.Final.jar:lib/io.netty.netty-codec-http-4.1.45.Final.jar:lib/org.quartz-scheduler.quartz-2.2.1.jar:lib/org.bouncycastle.bcprov-jdk15on-1.60.jar:lib/com.squareup.okio.okio-1.15.0.jar:lib/org.keycloak.keycloak-core-10.0.0.jar:lib/io.netty.netty-buffer-4.1.45.Final.jar:lib/org.yaml.snakeyaml-1.24.jar:lib/io.fabric8.openshift-client-4.6.4.jar:lib/io.netty.netty-common-4.1.45.Final.jar:lib/org.apache.logging.log4j.log4j-api-2.13.0.jar:lib/org.xerial.snappy.snappy-java-1.1.7.3.jar:lib/org.hdrhistogram.HdrHistogram-2.1.11.jar:lib/io.prometheus.simpleclient-0.7.0.jar:lib/com.sun.activation.jakarta.activation-1.2.1.jar:lib/org.apache.yetus.audience-annotations-0.5.0.jar:lib/com.fasterxml.jackson.dataformat.jackson-dataformat-yaml-2.10.2.jar:lib/io.micrometer.micrometer-core-1.3.1.jar:lib/io.netty.netty-codec-4.1.45.Final.jar:lib/org.keycloak.keycloak-common-10.0.0.jar:lib/jakarta.activation.jakarta.activation-api-1.2.1.jar:lib/io.vertx.vertx-core-3.8.5.jar:lib/io.strimzi.certificate-manager-0.18.0.jar:lib/io.strimzi.kafka-oauth-common-0.5.0.jar:lib/io.strimzi.kafka-oauth-server-0.5.0.jar:lib/io.netty.netty-codec-dns-4.1.45.Final.jar:lib/io.fabric8.kubernetes-model-4.6.4.jar:lib/io.netty.netty-codec-socks-4.1.45.Final.jar:lib/com.github.mifmif.generex-1.0.2.jar:lib/io.netty.netty-resolver-4.1.45.Final.jar:lib/com.github.luben.zstd-jni-1.4.4-7.jar:lib/io.netty.netty-handler-proxy-4.1.45.Final.jar:lib/com.squareup.okhttp3.logging-interceptor-3.12.6.jar:lib/io.strimzi.operator-common-0.18.0.jar:lib/org.bouncycastle.bcpkix-jdk15on-1.62.jar:lib/org.lz4.lz4-java-1.7.1.jar:lib/io.netty.netty-transport-native-epoll-4.1.45.Final-linux-x86_64.jar:lib/io.netty.netty-transport-native-unix-common-4.1.45.Final.jar:lib/dk.brics.automaton.automaton-1.11-8.jar:lib/io.vertx.vertx-micrometer-metrics-3.8.5.jar:lib/org.apache.kafka.kafka-clients-2.5.0.jar:lib/com.fasterxml.jackson.core.jackson-core-2.10.2.jar:lib/org.apache.zookeeper.zookeeper-jute-3.5.7.jar:lib/io.netty.netty-transport-4.1.45.Final.jar:lib/io.netty.netty-transport-native-epoll-4.1.45.Final.jar:lib/jakarta.xml.bind.jakarta.xml.bind-api-2.3.2.jar:lib/org.apache.logging.log4j.log4j-slf4j-impl-2.13.0.jar:lib/com.fasterxml.jackson.core.jackson-annotations-2.10.2.jar:lib/io.fabric8.zjsonpatch-0.3.0.jar:lib/org.apache.zookeeper.zookeeper-3.5.7.jar:lib/io.strimzi.api-0.18.0.jar:lib/io.fabric8.kubernetes-client-4.6.4.jar:lib/com.fasterxml.jackson.module.jackson-module-jaxb-annotations-2.10.2.jar:lib/com.squareup.okhttp3.okhttp-3.12.6.jar:lib/io.netty.netty-codec-http2-4.1.45.Final.jar:lib/io.strimzi.config-model-0.18.0.jar:lib/org.apache.logging.log4j.log4j-core-2.13.0.jar:lib/io.fabric8.kubernetes-model-common-4.6.4.jar:lib/com.fasterxml.jackson.core.jackson-databind-2.10.2.jar:lib/io.strimzi.crd-annotations-0.18.0.jar:lib/io.netty.netty-resolver-dns-4.1.45.Final.jar:lib/org.slf4j.slf4j-api-1.7.25.jar:lib/org.latencyutils.LatencyUtils-2.0.3.jar:lib/io.micrometer.micrometer-registry-prometheus-1.3.1.jar
   STRIMZI_KAFKA_CONNECT_S2I_IMAGES: 2.4.0=strimzi/kafka:0.18.0-kafka-2.4.0
2.4.1=strimzi/kafka:0.18.0-kafka-2.4.1
2.5.0=strimzi/kafka:0.18.0-kafka-2.5.0
   STRIMZI_DEFAULT_TLS_SIDECAR_KAFKA_IMAGE: strimzi/kafka:0.18.0-kafka-2.5.0
   TINI_VERSION: v0.18.0
   STRIMZI_HOME: /opt/strimzi
   STRIMZI_DEFAULT_TLS_SIDECAR_CRUISE_CONTROL_IMAGE: strimzi/kafka:0.18.0-kafka-2.5.0
   PWD: /opt/strimzi
   STRIMZI_DEFAULT_CRUISE_CONTROL_IMAGE: strimzi/kafka:0.18.0-kafka-2.5.0
   KUBERNETES_PORT_443_TCP: tcp://172.30.0.1:443
   STRIMZI_OPERATION_TIMEOUT_MS: 300000
   JAVA_MAIN: io.strimzi.operator.cluster.Main
   KUBERNETES_PORT_443_TCP_ADDR: 172.30.0.1
   STRIMZI_VERSION: 0.18.0
   STRIMZI_DEFAULT_JMXTRANS_IMAGE: strimzi/jmxtrans:0.18.0
   STRIMZI_NAMESPACE: streaming
   STRIMZI_KAFKA_MIRROR_MAKER_2_IMAGES: 2.4.0=strimzi/kafka:0.18.0-kafka-2.4.0
2.4.1=strimzi/kafka:0.18.0-kafka-2.4.1
2.5.0=strimzi/kafka:0.18.0-kafka-2.5.0
   KUBERNETES_PORT_443_TCP_PROTO: tcp
   STRIMZI_DEFAULT_USER_OPERATOR_IMAGE: strimzi/operator:0.18.0
   KUBERNETES_SERVICE_PORT: 443
   STRIMZI_DEFAULT_KAFKA_EXPORTER_IMAGE: strimzi/kafka:0.18.0-kafka-2.5.0
   STRIMZI_DEFAULT_KAFKA_INIT_IMAGE: strimzi/operator:0.18.0
   STRIMZI_LOG_LEVEL: INFO
   HOSTNAME: strimzi-cluster-operator-bf978cf5c-rc5l2
   STRIMZI_KAFKA_CONNECT_IMAGES: 2.4.0=strimzi/kafka:0.18.0-kafka-2.4.0
2.4.1=strimzi/kafka:0.18.0-kafka-2.4.1
2.5.0=strimzi/kafka:0.18.0-kafka-2.5.0
   STRIMZI_KAFKA_IMAGES: 2.4.0=strimzi/kafka:0.18.0-kafka-2.4.0
2.4.1=strimzi/kafka:0.18.0-kafka-2.4.1
2.5.0=strimzi/kafka:0.18.0-kafka-2.5.0
   KUBERNETES_PORT_443_TCP_PORT: 443
   STRIMZI_DEFAULT_TLS_SIDECAR_ENTITY_OPERATOR_IMAGE: strimzi/kafka:0.18.0-kafka-2.5.0
   KUBERNETES_SERVICE_PORT_HTTPS: 443
   SHLVL: 0
   HOME: /
   MALLOC_ARENA_MAX: 2
2020-05-27 11:09:20 INFO  ClusterOperator:87 - Creating ClusterOperator for namespace streaming
2020-05-27 11:09:20 INFO  ClusterOperator:105 - Starting ClusterOperator for namespace streaming
2020-05-27 11:09:21 INFO  ClusterOperator:119 - Opened watch for Kafka operator
2020-05-27 11:09:21 INFO  ClusterOperator:119 - Opened watch for KafkaMirrorMaker operator
2020-05-27 11:09:21 INFO  ClusterOperator:119 - Opened watch for KafkaConnect operator
2020-05-27 11:09:21 INFO  ClusterOperator:119 - Opened watch for KafkaBridge operator
2020-05-27 11:09:21 INFO  ClusterOperator:119 - Opened watch for KafkaMirrorMaker2 operator
2020-05-27 11:09:21 INFO  ClusterOperator:119 - Opened watch for KafkaConnectS2I operator
2020-05-27 11:09:21 INFO  ClusterOperator:130 - Setting up periodic reconciliation for namespace streaming
2020-05-27 11:09:22 INFO  ClusterOperator:192 - ClusterOperator is now ready (health server listening on 8080)
2020-05-27 11:09:22 INFO  Main:159 - Cluster Operator verticle started in namespace streaming
2020-05-27 11:11:21 INFO  ClusterOperator:132 - Triggering periodic reconciliation for namespace streaming...
2020-05-27 11:11:21 WARN  VersionUsageUtils:60 - The client is using resource type 'kafkas' with unstable version 'v1beta1'
2020-05-27 11:11:22 WARN  VersionUsageUtils:60 - The client is using resource type 'kafkamirrormakers' with unstable version 'v1beta1'
2020-05-27 11:11:22 WARN  VersionUsageUtils:60 - The client is using resource type 'kafkaconnects' with unstable version 'v1beta1'
2020-05-27 11:11:22 WARN  VersionUsageUtils:60 - The client is using resource type 'kafkamirrormaker2s' with unstable version 'v1alpha1'
2020-05-27 11:11:22 WARN  VersionUsageUtils:60 - The client is using resource type 'kafkabridges' with unstable version 'v1alpha1'
2020-05-27 11:11:22 WARN  VersionUsageUtils:60 - The client is using resource type 'kafkarebalances' with unstable version 'v1alpha1'
2020-05-27 11:11:22 WARN  VersionUsageUtils:60 - The client is using resource type 'kafkaconnects2is' with unstable version 'v1beta1'
2020-05-27 11:12:27 INFO  OperatorWatcher:40 - Reconciliation #0(watch) Kafka(streaming/logs): Kafka logs in namespace streaming was ADDED
2020-05-27 11:12:27 INFO  AbstractOperator:173 - Reconciliation #0(watch) Kafka(streaming/logs): Kafka logs should be created or updated
2020-05-27 11:12:28 INFO  OperatorWatcher:40 - Reconciliation #1(watch) Kafka(streaming/logs): Kafka logs in namespace streaming was MODIFIED
2020-05-27 11:12:31 WARN  VersionUsageUtils:60 - The client is using resource type 'poddisruptionbudgets' with unstable version 'v1beta1'
2020-05-27 11:12:38 WARN  AbstractOperator:247 - Reconciliation #1(watch) Kafka(streaming/logs): Failed to acquire lock lock::streaming::Kafka::logs within 10000ms.
2020-05-27 11:13:21 INFO  ClusterOperator:132 - Triggering periodic reconciliation for namespace streaming...
2020-05-27 11:13:31 WARN  AbstractOperator:247 - Reconciliation #2(timer) Kafka(streaming/logs): Failed to acquire lock lock::streaming::Kafka::logs within 10000ms.
2020-05-27 11:14:20 INFO  OperatorWatcher:40 - Reconciliation #3(watch) Kafka(streaming/logs): Kafka logs in namespace streaming was MODIFIED
2020-05-27 11:14:20 INFO  AbstractOperator:318 - Reconciliation #0(watch) Kafka(streaming/logs): reconciled
2020-05-27 11:14:20 INFO  AbstractOperator:173 - Reconciliation #3(watch) Kafka(streaming/logs): Kafka logs should be created or updated
2020-05-27 11:14:23 INFO  AbstractOperator:318 - Reconciliation #3(watch) Kafka(streaming/logs): reconciled

After the operator does its work, you have the cluster at your disposal. To check the Kafka resources in minishift:

 ~  oc get kafka
NAME         DESIRED KAFKA REPLICAS   DESIRED ZK REPLICAS
logs         1                        1

If you remove the Kafka resource, Strimzi will delete all dependent objects for you. This is very important, because if you delete single objects like a stateful set of Apache ZooKeeper, Strimzi reconciles and will auto-create it again. The only way to remove all created resources is to eliminate the Kafka resource.

Do not do this yet, if you want to follow the example!

 ~  oc delete kafka logs
kafka.kafka.strimzi.io "logs" deleted

Stateful Sets

Strimzi creates stateful sets for our Kafka cluster. What is a StatefulSet?

StatefulSet is the workload API object used to manage stateful applications. It controls the deployment and scaling of a set of Pods and provides guarantees about the ordering and uniqueness of these Pods. A Kafka broker and ZooKeeper node must be unique. This is mandatory for clusters that aim high availability.

The name of the Kafka cluster is the prefix for the following stateful sets.

oc get statefulset
NAME             DESIRED   CURRENT   AGE
logs-kafka       1         1         15m
logs-zookeeper   1         1         16m

Deployments

The Strimzi operator also deploys operators itself.

~  oc get deployment
NAME                       DESIRED   CURRENT   UP-TO-DATE   AVAILABLE   AGE
logs-entity-operator       1         1         1            1           15m
strimzi-cluster-operator   1         1         1            1           56m

In this case, it is the Entity Operator.

Operators

The Entity Operator comprises the Topic Operator and User Operator.

These operators listen to custom resources. In the case of the Topic Operator, it is KafkaTopic. We create a Kafka topic.

cat mimacom-topic.yml:

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaTopic
metadata:
  name: spring-boot
  labels:
    strimzi.io/cluster: logs
spec:
  partitions: 1
  replicas: 1

Since it is a single node, we use 1 partition and 1 replica. We create the KafkaTopic.

oc apply -f mimacom-topic.yml
kafkatopic.kafka.strimzi.io/spring-boot created

List topic (the OpenShift/Strimzi way)

oc get kafkatopic                  
NAME          PARTITIONS   REPLICATION FACTOR
spring-boot   1            1

Kafka topics are managed differently in OpenShift and Kubernetes. In a native Kafka installation, you will use the kafka-topics.sh to create a topic, or if auto-create is enabled, Kafka creates the topic itself. These topics will not be visible to Strimzi, so using the entity operator is vital for managing Kafka topics on OpenShift!

Networking

Strimzi creates these routes and exposes the cluster for outside Access.

oc get routes
NAME                   HOST/PORT                                             PATH      SERVICES                        PORT      TERMINATION   WILDCARD
logs-kafka-0           logs-kafka-0-streaming.192.168.42.24.nip.io                     logs-kafka-0                    9094      passthrough   None
logs-kafka-bootstrap   logs-kafka-bootstrap-streaming.192.168.42.24.nip.io             logs-kafka-external-bootstrap   9094      passthrough   None

Other Objects

If you want to see all resources that Strimzi created, list all objects.

oc get all
NAME                                           READY     STATUS    RESTARTS   AGE
pod/logs-entity-operator-c8ff8696f-8x6vk       3/3       Running   0          43m
pod/logs-kafka-0                               2/2       Running   0          43m
pod/logs-zookeeper-0                           1/1       Running   0          44m
pod/strimzi-cluster-operator-bf978cf5c-rc5l2   1/1       Running   0          2h

NAME                                    TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)                      AGE
service/logs-kafka-0                    ClusterIP   172.30.204.40   <none>        9094/TCP                     43m
service/logs-kafka-bootstrap            ClusterIP   172.30.99.158   <none>        9091/TCP,9092/TCP,9093/TCP   43m
service/logs-kafka-brokers              ClusterIP   None            <none>        9091/TCP,9092/TCP,9093/TCP   43m
service/logs-kafka-external-bootstrap   ClusterIP   172.30.39.229   <none>        9094/TCP                     43m
service/logs-zookeeper-client           ClusterIP   172.30.142.59   <none>        2181/TCP                     44m
service/logs-zookeeper-nodes            ClusterIP   None            <none>        2181/TCP,2888/TCP,3888/TCP   44m

NAME                                       DESIRED   CURRENT   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/logs-entity-operator       1         1         1            1           43m
deployment.apps/strimzi-cluster-operator   1         1         1            1           2h

NAME                                                 DESIRED   CURRENT   READY     AGE
replicaset.apps/logs-entity-operator-c8ff8696f       1         1         1         43m
replicaset.apps/strimzi-cluster-operator-bf978cf5c   1         1         1         2h

NAME                              DESIRED   CURRENT   AGE
statefulset.apps/logs-kafka       1         1         43m
statefulset.apps/logs-zookeeper   1         1         44m

NAME                                            HOST/PORT                                             PATH      SERVICES                        PORT      TERMINATION   WILDCARD
route.route.openshift.io/logs-kafka-0           logs-kafka-0-streaming.192.168.42.24.nip.io                     logs-kafka-0                    9094      passthrough   None
route.route.openshift.io/logs-kafka-bootstrap   logs-kafka-bootstrap-streaming.192.168.42.24.nip.io             logs-kafka-external-bootstrap   9094      passthrough   None

Using Kafka on OpenShift

We use some test data for time tracking. This data is randomly generated.

[{
  "id": 1,
  "first_name": "Redd",
  "last_name": "Ulyat",
  "email": "rulyat0@ft.com",
  "gender": "Male",
  "start": "18:06",
  "end": "17:50",
  "customer": "Feedspan"
}, {
  "id": 2,
  "first_name": "Suzann",
  "last_name": "Andryushchenko",
  "email": "sandryushchenko1@cyberchimps.com",
  "gender": "Female",
  "start": "12:14",
  "end": "20:53",
  "customer": "Skyndu"
}, {
  "id": 3,
  "first_name": "Sunny",
  "last_name": "Statter",
  "email": "sstatter2@purevolume.com",
  "gender": "Male",
  "start": "9:46",
  "end": "14:29",
  "customer": "Pixope"
}, {
  "id": 4,
  "first_name": "Glori",
  "last_name": "Cumberpatch",
  "email": "gcumberpatch3@dedecms.com",
  "gender": "Female",
  "start": "18:13",
  "end": "11:29",
  "customer": "Agimba"
}, {
  "id": 5,
  "first_name": "Ashlee",
  "last_name": "Fynan",
  "email": "afynan4@uiuc.edu",
  "gender": "Female",
  "start": "10:01",
  "end": "1:53",
  "customer": "Gigabox"
}, {
  "id": 6,
  "first_name": "Darin",
  "last_name": "Gulc",
  "email": "dgulc5@ihg.com",
  "gender": "Male",
  "start": "9:43",
  "end": "5:10",
  "customer": "Rhyloo"
}, {
  "id": 7,
  "first_name": "Brodie",
  "last_name": "Casari",
  "email": "bcasari6@mashable.com",
  "gender": "Male",
  "start": "5:22",
  "end": "12:00",
  "customer": "Eamia"
}, {
  "id": 8,
  "first_name": "Morty",
  "last_name": "Currington",
  "email": "mcurrington7@usa.gov",
  "gender": "Male",
  "start": "21:22",
  "end": "17:01",
  "customer": "Quatz"
}, {
  "id": 9,
  "first_name": "Jacobo",
  "last_name": "Lanmeid",
  "email": "jlanmeid8@illinois.edu",
  "gender": "Male",
  "start": "18:10",
  "end": "18:39",
  "customer": "Buzzster"
}, {
  "id": 10,
  "first_name": "Ilario",
  "last_name": "Embra",
  "email": "iembra9@ca.gov",
  "gender": "Male",
  "start": "13:51",
  "end": "18:54",
  "customer": "Oyondu"
}]

Producing Data

We start a pod with a container for producing data. The container is clean up after termination. Do this in your Linux terminal.

oc -n streaming run kafka-producer -ti --image=strimzi/kafka:0.18.0-kafka-2.5.0 \
--rm=true --restart=Never -- bin/kafka-console-producer.sh \
--broker-list logs-kafka-bootstrap:9092 --topic time-tracking

Paste the minified test-data into it. CTRL + C closes the producer.

If you don't see a command prompt, try pressing enter.
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
>[{"id":1,"first_name":"Redd","last_name":"Ulyat","email":"rulyat0@ft.com","gender":"Male","start":"18:06","end":"17:50","customer":"Feedspan"},{"id":2,"first_name":"Suzann","last_name":"Andryushchenko","email":"sandryushchenko1@cyberchimps.com","gender":"Female","start":"12:14","end":"20:53","customer":"Skyndu"},{"id":3,"first_name":"Sunny","last_name":"Statter","email":"sstatter2@purevolume.com","gender":"Male","start":"9:46","end":"14:29","customer":"Pixope"},{"id":4,"first_name":"Glori","last_name":"Cumberpatch","email":"gcumberpatch3@dedecms.com","gender":"Female","start":"18:13","end":"11:29","customer":"Agimba"},{"id":5,"first_name":"Ashlee","last_name":"Fynan","email":"afynan4@uiuc.edu","gender":"Female","start":"10:01","end":"1:53","customer":"Gigabox"},{"id":6,"first_name":"Darin","last_name":"Gulc","email":"dgulc5@ihg.com","gender":"Male","start":"9:43","end":"5:10","customer":"Rhyloo"},{"id":7,"first_name":"Brodie","last_name":"Casari","email":"bcasari6@mashable.com","gender":"Male","start":"5:22","end":"12:00","customer":"Eamia"},{"id":8,"first_name":"Morty","last_name":"Currington","email":"mcurrington7@usa.gov","gender":"Male","start":"21:22","end":"17:01","customer":"Quatz"},{"id":9,"first_name":"Jacobo","last_name":"Lanmeid","email":"jlanmeid8@illinois.edu","gender":"Male","start":"18:10","end":"18:39","customer":"Buzzster"},{"id":10,"first_name":"Ilario","last_name":"Embra","email":"iembra9@ca.gov","gender":"Male","start":"13:51","end":"18:54","customer":"Oyondu"}]
[2020-05-27 14:40:37,082] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 11 : {time-tracking=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2020-05-27 14:40:37,195] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 12 : {time-tracking=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
>^Cpod "kafka-producer" deleted
pod streaming/kafka-producer terminated (Error)

The container could communicate within the namespace with the Kafka cluster over the service name logs-kafka-bootstrap on port 9092.

Consuming Data

Consuming data follows the same principle. To get the data from the start, pass the option --from-beginning.

oc -n streaming run kafka-consumer -ti --image=strimzi/kafka:0.18.0-kafka-2.5.0 \
--rm=true --restart=Never -- bin/kafka-console-consumer.sh \
--bootstrap-server logs-kafka-bootstrap:9092 --topic time-tracking --from-beginning

You will receive this output. CTRL + C closes the application.

If you don't see a command prompt, try pressing enter.
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
[{"id":1,"first_name":"Redd","last_name":"Ulyat","email":"rulyat0@ft.com","gender":"Male","start":"18:06","end":"17:50","customer":"Feedspan"},{"id":2,"first_name":"Suzann","last_name":"Andryushchenko","email":"sandryushchenko1@cyberchimps.com","gender":"Female","start":"12:14","end":"20:53","customer":"Skyndu"},{"id":3,"first_name":"Sunny","last_name":"Statter","email":"sstatter2@purevolume.com","gender":"Male","start":"9:46","end":"14:29","customer":"Pixope"},{"id":4,"first_name":"Glori","last_name":"Cumberpatch","email":"gcumberpatch3@dedecms.com","gender":"Female","start":"18:13","end":"11:29","customer":"Agimba"},{"id":5,"first_name":"Ashlee","last_name":"Fynan","email":"afynan4@uiuc.edu","gender":"Female","start":"10:01","end":"1:53","customer":"Gigabox"},{"id":6,"first_name":"Darin","last_name":"Gulc","email":"dgulc5@ihg.com","gender":"Male","start":"9:43","end":"5:10","customer":"Rhyloo"},{"id":7,"first_name":"Brodie","last_name":"Casari","email":"bcasari6@mashable.com","gender":"Male","start":"5:22","end":"12:00","customer":"Eamia"},{"id":8,"first_name":"Morty","last_name":"Currington","email":"mcurrington7@usa.gov","gender":"Male","start":"21:22","end":"17:01","customer":"Quatz"},{"id":9,"first_name":"Jacobo","last_name":"Lanmeid","email":"jlanmeid8@illinois.edu","gender":"Male","start":"18:10","end":"18:39","customer":"Buzzster"},{"id":10,"first_name":"Ilario","last_name":"Embra","email":"iembra9@ca.gov","gender":"Male","start":"13:51","end":"18:54","customer":"Oyondu"}]

^CProcessed a total of 2 messages
pod "kafka-consumer" deleted
pod streaming/kafka-consumer terminated (Error)

External Access of Kafka

To access the cluster from a local Kafka installation, we have to get a certificate since the route is https encrypted. This is another strength of Strimzi. It comes easily without the need for us to do it.

For demonstration purpose, we use an older client version 2.4.1 indifference to the deployed version 2.5.0.

Show all secrets and list certificate authorities (ca).

oc get secrets | grep ca-cert
logs-clients-ca-cert                       Opaque                                3         1h
logs-cluster-ca-cert                       Opaque                                3         1h

We are only interested in the cluster ca. To view, it uses this command. The sensitive output is redacted.

oc get secret logs-cluster-ca-cert -o yaml
apiVersion: v1
data:
  ca.crt: LS0tLS1...
  ca.p12: MIIEVgI...
  ca.password: xxxx
kind: Secret
metadata:
  annotations:
    strimzi.io/ca-cert-generation: "0"
  creationTimestamp: 2020-05-27T12:55:29Z
  labels:
    app.kubernetes.io/instance: logs
    app.kubernetes.io/managed-by: strimzi-cluster-operator
    app.kubernetes.io/name: strimzi
    app.kubernetes.io/part-of: strimzi-logs
    strimzi.io/cluster: logs
    strimzi.io/kind: Kafka
    strimzi.io/name: strimzi
  name: logs-cluster-ca-cert
  namespace: streaming
  ownerReferences:
  - apiVersion: kafka.strimzi.io/v1beta1
    blockOwnerDeletion: false
    controller: false
    kind: Kafka
    name: logs
    uid: 5783c877-a019-11ea-9be2-525400a45a70
  resourceVersion: "147848"
  selfLink: /api/v1/namespaces/streaming/secrets/logs-cluster-ca-cert
  uid: 57fa7398-a019-11ea-9be2-525400a45a70
type: Opaque

Export it to a newly created Java truststore.

oc extract secret/logs-cluster-ca-cert --keys=ca.crt --to=- > ca.crt
keytool -import -trustcacerts -alias root -file ca.crt -keystore truststore.jks -storepass password -noprompt
# ca.crt
Certificate was added to keystore

Consuming Data

Go to your local Kafka installation and pass the mandatory information. We use the bootstrap address logs-kafka-bootstrap-streaming.192.168.42.24.nip.io to access the Kafka cluster.

bin/kafka-console-consumer.sh \
--bootstrap-server logs-kafka-bootstrap-streaming.192.168.42.24.nip.io:443 \
--consumer-property security.protocol=SSL \
--consumer-property ssl.truststore.password=password \
--consumer-property ssl.truststore.location=/home/tan/truststore.jks \
--topic time-tracking --from-beginning 

The client output:

[{"id":1,"first_name":"Redd","last_name":"Ulyat","email":"rulyat0@ft.com","gender":"Male","start":"18:06","end":"17:50","customer":"Feedspan"},{"id":2,"first_name":"Suzann","last_name":"Andryushchenko","email":"sandryushchenko1@cyberchimps.com","gender":"Female","start":"12:14","end":"20:53","customer":"Skyndu"},{"id":3,"first_name":"Sunny","last_name":"Statter","email":"sstatter2@purevolume.com","gender":"Male","start":"9:46","end":"14:29","customer":"Pixope"},{"id":4,"first_name":"Glori","last_name":"Cumberpatch","email":"gcumberpatch3@dedecms.com","gender":"Female","start":"18:13","end":"11:29","customer":"Agimba"},{"id":5,"first_name":"Ashlee","last_name":"Fynan","email":"afynan4@uiuc.edu","gender":"Female","start":"10:01","end":"1:53","customer":"Gigabox"},{"id":6,"first_name":"Darin","last_name":"Gulc","email":"dgulc5@ihg.com","gender":"Male","start":"9:43","end":"5:10","customer":"Rhyloo"},{"id":7,"first_name":"Brodie","last_name":"Casari","email":"bcasari6@mashable.com","gender":"Male","start":"5:22","end":"12:00","customer":"Eamia"},{"id":8,"first_name":"Morty","last_name":"Currington","email":"mcurrington7@usa.gov","gender":"Male","start":"21:22","end":"17:01","customer":"Quatz"},{"id":9,"first_name":"Jacobo","last_name":"Lanmeid","email":"jlanmeid8@illinois.edu","gender":"Male","start":"18:10","end":"18:39","customer":"Buzzster"},{"id":10,"first_name":"Ilario","last_name":"Embra","email":"iembra9@ca.gov","gender":"Male","start":"13:51","end":"18:54","customer":"Oyondu"}]

^CProcessed a total of 2 messages

Producing data follows the same principle. You can pass all the properties or create consumer.properties or producer.properties for respective usage. We demonstrate it in the next section.

List Topics

As you have read, handling topics should go over the entity operator. The created example topic time-tracking, it is not listed in OpenShift. You can use kafka-topics.sh to view the topics from outside. We need to create a properties file for the client with the settings for encrypted communication.

Create properties.file

 ~/test/kafka_2.11-2.4.1  echo security.protocol=SSL >> strimzi-minishift.properties
 ~/test/kafka_2.11-2.4.1  echo ssl.truststore.password=password >> strimzi-minishift.properties
 ~/test/kafka_2.11-2.4.1  echo ssl.truststore.location=/home/tan/truststore.jks >> strimzi-minishift.properties

List all topics

bin/kafka-topics.sh --bootstrap-server logs-kafka-bootstrap-streaming.192.168.42.24.nip.io:443 \
--list --command-config ./strimzi-minishift.properties

The output

 ~/test/kafka_2.11-2.4.1  bin/kafka-topics.sh --bootstrap-server logs-kafka-bootstrap-streaming.192.168.42.24.nip.io:443 --list --command-config ./strimzi-minishift.properties                                                                             
__consumer_offsets
spring-boot
time-tracking

Summary

In this article, you have learned the basics about using Strimzi to deploy Apache Kafka on Kubernetes and OpenShift. External Access from outside was a crucial desire of us, and Strimzi comes with robust solutions. You can test this example hands-on yourself. All you need to do is to install minishift and Apache Kafka on your Linux environment.

About the author: Vinh Nguyên

Loves to code, hike and mostly drink black coffee. Favors Apache Kafka, Elasticsearch, Java Development and 80's music.

Comments
Join us