Apache Kafka with Node.js
Over the last few months Apache Kafka gained a lot of traction in the industry and more and more companies explore how to effectively use Kafka in their production environments.
If you are not familiar with Apache Kafka, I recommend reading this excellent introduction (http://kafka.apache.org/documentation.html#introduction). To give you a quick overview here are the core principles:
- Kafka is run as a cluster on one or more servers.
- The Kafka cluster stores streams of records in categories called topics.
- Each record consists of a key, a value, and a timestamp.
- A producer publishes messages to one or many Kafka topics. Information about where to publish the message is contained within the message itself. It’s therefore possible to publish to different topics using the same producer.
- A consumer subscribes to one or many Kafka topics and reads messages published to these topics. Multiple consumers can work in tandem to form a consumer group (-> parallelization)
There are many tutorials on how to use Kafka within a Java environment. For this blog however we will take a different route and try to explore how we can develop a simple Kafka producer and consumer combo within Node.js, while still leveraging the Avro data serialization system (https://avro.apache.org/).
Setup
First we need to install a version of Kafka on our local system. To do so we download one of the binary packages from http://kafka.apache.org/downloads and extract it
$ tar -xvf kafka_2.12-0.10.2.0.tgz
$ cd kafka_2.12-0.10.2.0
After we have successfully extracted the package we are ready to startup a ZooKeeper (https://zookeeper.apache.org/doc/r3.1.2/zookeeperOver.html) instance
$ bin/zookeeper-server-start.sh config/zookeeper.properties
[2017-02-22 12:01:12,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[...]
Now we can start our Kafka server
$ bin/kafkas-server-start.sh config/server.properties
[...]
[2017-03-10 10:12:40,160] INFO starting (kafka.server.KafkaServer)
[2017-03-10 10:12:40,164] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2017-03-10 10:12:40,178] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
[...]
We need to create a new topic for our tests
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic node-test
node-test
Let’s quickly verify that our topic was created correctly
$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic node-test
Topic:node-test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: node-test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Creating a producer
For accessing Kafka we will be using the kafka-node client (https://github.com/SOHU-Co/kafka-node) which offers an API for
- Producer
- Consumer
- Offset management
- Consumer groups
For handling our serialization we’ll be using a plain implementation of the Avro specification in Javascript (https://github.com/mtth/avsc).
Let’s start with installing our dependencies
$ npm install kafka-node
$ npm install avsc
After that we’re ready to start implementing our producer. To keep our code as accessible as possible we’re not leveraging modern Javascript concepts (e.g. ES6 or Typescript) and stick to plain Javascript.
Avro
First we’ll define our Avro schema MyAwesomeType. It’s a simple type consisting of three fields
- id: string
- timestamp: double
- enumField: enum
var avroSchema = {
name: 'MyAwesomeType',
type: 'record',
fields: [
{
name: 'id',
type: 'string'
}, {
name: 'timestamp',
type: 'double'
}, {
name: 'enumField',
type: {
name: 'EnumField',
type: 'enum',
symbols: ['sym1', 'sym2', 'sym3']
}
}]
};
Next we want to create a new Avro type from our schema
var avro = require('avsc');
var type = avro.parse(avroSchema);
This concludes the Avro part of the implementation.
Producer
Let’s start by importing the client and creating some aliases for the kafka-node modules
var kafka = require('kafka-node');
var HighLevelProducer = kafka.HighLevelProducer;
var KeyedMessage = kafka.KeyedMessage;
var Client = kafka.Client;
Next we create a client
var client = new Client('localhost:2181', 'my-client-id', {
sessionTimeout: 300,
spinDelay: 100,
retries: 2
});
// For this demo we just log client errors to the console.
client.on('error', function(error) {
console.error(error);
});
Now we can create the producer and send a message
var producer = new HighLevelProducer(client);
producer.on('ready', function() {
// Create message and encode to Avro buffer
var messageBuffer = type.toBuffer({
enumField: 'sym1',
id: '3e0c63c4-956a-4378-8a6d-2de636d191de',
timestamp: Date.now()
});
// Create a new payload
var payload = [{
topic: 'node-test',
messages: messageBuffer,
attributes: 1 /* Use GZip compression for the payload */
}];
//Send payload to Kafka and log result/error
producer.send(payload, function(error, result) {
console.info('Sent payload to Kafka: ', payload);
if (error) {
console.error(error);
} else {
var formattedResult = result[0];
console.log('result: ', result)
}
});
});
// For this demo we just log producer errors to the console.
producer.on('error', function(error) {
console.error(error);
});
Let’s run it
$ node producer.js
Sent payload to Kafka: [ { topic: 'node-test',
messages: <Buffer 00 48 33 65 30 63 36 33 63 34 2d 39 35 36 61 2d 34 33 37 38 2d 38 61 36 64 2d 32 64 65 36 33 36 64 31 39 31 64 65 00 60 13 1f 7c ab 75 42>,
attributes: 1,
partition: 0 } ]
result: { 'node-test': { '0': 1 } }
We successfully sent a message to partition 0 of the topic node-test. The message has an offset of 1.
Creating a consumer
We will be using the same libraries as for the producer.
Avro
We use the same definition as for the producer
var typeDescription = {
name: 'MyAwesomeType',
type: 'record',
fields: [{
name: 'enumField',
type: {
name: 'EnumField',
type: 'enum',
symbols: ['sym1', 'sym2', 'sym3']
}
}, {
name: 'id',
type: 'string'
}, {
name: 'timestamp',
type: 'double'
}]
};
var avro = require('avsc');
var type = avro.parse(typeDescription);
Consumer
Let’s again start by importing the client and creating some useful aliases
var kafka = require('kafka-node');
var HighLevelConsumer = kafka.HighLevelConsumer;
var Client = kafka.Client;
Next we create a new client
var client = new Client('localhost:2181');
var topics = [{
topic: 'node-test'
}];
Now we can create the consumer and start consuming messages
var options = {
autoCommit: true,
fetchMaxWaitMs: 1000,
fetchMaxBytes: 1024 * 1024,
encoding: 'buffer'
};
var consumer = new HighLevelConsumer(client, topics, options);
consumer.on('message', function(message) {
var buf = new Buffer(message.value, 'binary'); // Read string into a buffer.
var decodedMessage = type.fromBuffer(buf.slice(0)); // Skip prefix.
console.log(decodedMessage);
});
consumer.on('error', function(err) {
console.log('error', err);
});
process.on('SIGINT', function() {
consumer.close(true, function() {
process.exit();
});
});
Let’s run it
$ node consumer.js
MyAwesomeType {
enumField: 'sym1',
id: '3e0c63c4-956a-4378-8a6d-2de636d191de',
timestamp: 1489141625142 }
If we leave the consumer running and produce a new message it will automatically receive and display it
$ node consumer.js
MyAwesomeType {
enumField: 'sym1',
id: '3e0c63c4-956a-4378-8a6d-2de636d191de',
timestamp: 1489141625142 }
MyAwesomeType {
enumField: 'sym2',
id: '1ad3f23e-cf03-42b5-932f-aad5fab8e07f',
timestamp: 1489144178087 }
Summary
We have shown that it’s quite simple to interact with Apache Kafka using Node.js/Javascript. We have of course only scratched the surface of kafka-node. There’s a lot more that can be done, e.g. implementing consumer groups, custom offset management or creating custom partitioners. See https://github.com/SOHU-Co/kafka-node for more details.
If you’re implementing a producer/consumer for your next project, maybe take a look at the used libraries – they seem to be mature enough to be used in production.