Apache Kafka with Node.js

April 7, 2017

Over the last few months Apache Kafka gained a lot of traction in the industry and more and more companies explore how to effectively use Kafka in their production environments.

If you are not familiar with Apache Kafka, I recommend reading this excellent introduction (http://kafka.apache.org/documentation.html#introduction). To give you a quick overview here are the core principles:

There are many tutorials on how to use Kafka within a Java environment. For this blog however we will take a different route and try to explore how we can develop a simple Kafka producer and consumer combo within Node.js, while still leveraging the Avro data serialization system (https://avro.apache.org/).

Setup

First we need to install a version of Kafka on our local system. To do so we download one of the binary packages from http://kafka.apache.org/downloads and extract it

$ tar -xvf kafka_2.12-0.10.2.0.tgz
$ cd kafka_2.12-0.10.2.0 

After we have successfully extracted the package we are ready to startup a ZooKeeper (https://zookeeper.apache.org/doc/r3.1.2/zookeeperOver.html) instance

$ bin/zookeeper-server-start.sh config/zookeeper.properties
 [2017-02-22 12:01:12,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
 [...]

Now we can start our Kafka server

$ bin/kafkas-server-start.sh config/server.properties
 [...]
 [2017-03-10 10:12:40,160] INFO starting (kafka.server.KafkaServer)
 [2017-03-10 10:12:40,164] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
 [2017-03-10 10:12:40,178] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
 [...]

We need to create a new topic for our tests

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic node-test
 node-test

Let’s quickly verify that our topic was created correctly

$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic node-test
 Topic:node-test PartitionCount:1 ReplicationFactor:1 Configs:
  Topic: node-test Partition: 0 Leader: 0 Replicas: 0 Isr: 0

Creating a producer

For accessing Kafka we will be using the kafka-node client (https://github.com/SOHU-Co/kafka-node) which offers an API for

For handling our serialization we’ll be using a plain implementation of the Avro specification in Javascript (https://github.com/mtth/avsc).

Let’s start with installing our dependencies

$ npm install kafka-node
$ npm install avsc

After that we’re ready to start implementing our producer.  To keep our code as accessible as possible we’re not leveraging modern Javascript concepts (e.g. ES6 or Typescript) and stick to plain Javascript.

Avro

First we’ll define our Avro schema MyAwesomeType. It’s a simple type consisting of three fields

var avroSchema = {
   name: 'MyAwesomeType',
   type: 'record',
   fields: [
     {
       name: 'id',
       type: 'string'
     }, {
       name: 'timestamp',
       type: 'double'
     }, {
       name: 'enumField',
       type: {
         name: 'EnumField',
         type: 'enum',
         symbols: ['sym1', 'sym2', 'sym3']
       }
     }]
 };

Next we want to create a new Avro type from our schema

var avro = require('avsc');
var type = avro.parse(avroSchema);

This concludes the Avro part of the implementation.

Producer

Let’s start by importing the client and creating some aliases for the kafka-node modules

var kafka = require('kafka-node');
var HighLevelProducer = kafka.HighLevelProducer;
var KeyedMessage = kafka.KeyedMessage;
var Client = kafka.Client;

Next we create a client

var client = new Client('localhost:2181', 'my-client-id', {
  sessionTimeout: 300,
  spinDelay: 100,
  retries: 2
});

// For this demo we just log client errors to the console.
client.on('error', function(error) {
  console.error(error);
});

Now we can create the producer and send a message

var producer = new HighLevelProducer(client);

producer.on('ready', function() {
  // Create message and encode to Avro buffer
  var messageBuffer = type.toBuffer({
    enumField: 'sym1',
    id: '3e0c63c4-956a-4378-8a6d-2de636d191de',
    timestamp: Date.now()
  });

  // Create a new payload
  var payload = [{
    topic: 'node-test',
    messages: messageBuffer,
    attributes: 1 /* Use GZip compression for the payload */
  }];

  //Send payload to Kafka and log result/error
  producer.send(payload, function(error, result) {
    console.info('Sent payload to Kafka: ', payload);
    if (error) {
      console.error(error);
    } else {
      var formattedResult = result[0];
      console.log('result: ', result)
    }
  });
});

// For this demo we just log producer errors to the console.
producer.on('error', function(error) {
  console.error(error);
});

Let’s run it

$ node producer.js
Sent payload to Kafka: [ { topic: 'node-test',
 messages: <Buffer 00 48 33 65 30 63 36 33 63 34 2d 39 35 36 61 2d 34 33 37 38 2d 38 61 36 64 2d 32 64 65 36 33 36 64 31 39 31 64 65 00 60 13 1f 7c ab 75 42>,
 attributes: 1,
 partition: 0 } ]
result: { 'node-test': { '0': 1 } }

We successfully sent a message to partition 0 of the topic node-test. The message has an offset of 1.

Creating a consumer

We will be using the same libraries as for the producer.

Avro

We use the same definition as for the producer

var typeDescription = {
  name: 'MyAwesomeType',
  type: 'record',
  fields: [{
    name: 'enumField',
    type: {
      name: 'EnumField',
      type: 'enum',
      symbols: ['sym1', 'sym2', 'sym3']
    }
  }, {
    name: 'id',
    type: 'string'
  }, {
    name: 'timestamp',
    type: 'double'
  }]
};

var avro = require('avsc');
var type = avro.parse(typeDescription);

Consumer

Let’s again start by importing the client and creating some useful aliases

var kafka = require('kafka-node');
var HighLevelConsumer = kafka.HighLevelConsumer;
var Client = kafka.Client;

Next we create a new client

var client = new Client('localhost:2181');
var topics = [{
  topic: 'node-test'
}];

Now we can create the consumer and start consuming messages

var options = {
  autoCommit: true,
  fetchMaxWaitMs: 1000,
  fetchMaxBytes: 1024 * 1024,
  encoding: 'buffer'
};
var consumer = new HighLevelConsumer(client, topics, options);

consumer.on('message', function(message) {
  var buf = new Buffer(message.value, 'binary'); // Read string into a buffer.
  var decodedMessage = type.fromBuffer(buf.slice(0)); // Skip prefix.
  console.log(decodedMessage);
});

consumer.on('error', function(err) {
  console.log('error', err);
});

process.on('SIGINT', function() {
  consumer.close(true, function() {
    process.exit();
  });
});

Let’s run it

$ node consumer.js
MyAwesomeType {
 enumField: 'sym1',
 id: '3e0c63c4-956a-4378-8a6d-2de636d191de',
 timestamp: 1489141625142 }

If we leave the consumer running and produce a new message it will automatically receive and display it

$ node consumer.js
MyAwesomeType {
 enumField: 'sym1',
 id: '3e0c63c4-956a-4378-8a6d-2de636d191de',
 timestamp: 1489141625142 }
MyAwesomeType {
 enumField: 'sym2',
 id: '1ad3f23e-cf03-42b5-932f-aad5fab8e07f',
 timestamp: 1489144178087 } 

Summary

We have shown that it’s quite simple to interact with Apache Kafka using Node.js/Javascript. We have of course only scratched the surface of kafka-node. There’s a lot more that can be done, e.g. implementing consumer groups, custom offset management or creating custom partitioners. See https://github.com/SOHU-Co/kafka-node for more details.

If you’re implementing a producer/consumer for your next project, maybe take a look at the used libraries – they seem to be mature enough to be used in production.

About the author: Daniel Willig
Comments
Join us