Building a data pipeline from HDFS to ElasticSearch using Kafka and Logstash

Logstash has no input plugin for  HDFS as you can see here and it cannot load data directly from HDFS to Elasticsearch. In a previous post I showed one way to bypass this limitation by using Hive. This time we will see another way, using Kafka.

I tried this process on two versions of Kafka: Apache kafka which is the latest and Kafka 0.9 that comes with Cloudera CDH5.6. I found that the two versions differ in the API’s and therefore in the jar files that are needed to run the process. So depending on which version of Kafka you run you will need different set of jar’s and parameters. I will show it in more detail later.

As a sample data file, I downloaded Leo Tolstoy’s book War and peace from project Guthenberg in text format. This text file is more than 65,000 lines long.

First, we need to create a Kafka topic. Since my cluster only has 6 nodes, I will create the topic with 6 partitions. If not explicitly set in their programs, producers and consumers of this topic will inherit the number of partitions from the topic. --zookeeper cloudera1:2181 --create --topic elastic --partitions 6 --replication-factor 3
Created topic "elastic".

I do not want the messages to be left in the queue after the program exits so I will configure the topic to be purged every 10 seconds: --zookeeper cloudera1:2181 --entity-type topics --alter --add-config --entity-name elastic

Now we have to write a program that on one hand can read the file directly from HDFS, and on the other hand can serve as a Kafka producer and hand the lines to the Kafka topic. This is the small producer program I wrote in Java:

package producers;

 * @author guys
import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;

// Kafka specific imports
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
// HDFS specific imports

import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.Configuration;

 * @author guys
public class SimpleFileProducer {
 Producer<String, String> producer;
 String topic, filePath,bootstrapServers,HDFShost; 
 Path path;
 BufferedReader br; 
 Configuration conf;
 Config myConf; 
 public SimpleFileProducer(String configPath) throws IOException {
 // Read initial configuration
 myConf=new Config(configPath);
 // first setting the kafka producer stuff
 Properties props = new Properties(); 
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 producer = new KafkaProducer<>(props);
 // Now taking care of the HDFS access
 path=new Path(myConf.get("inputfile")); 
 conf=new Configuration();
 conf.set("fs.defaultFS", myConf.get("hdfs"));
 FileSystem fs = FileSystem.get(path.toUri(), conf);
 br=new BufferedReader(new InputStreamReader(;
 private void publishMessage(String msg) 
 ProducerRecord <String, String> data =new ProducerRecord<String, String>(topic, msg, msg);
 public void start() throws IOException, InterruptedException
 String line="1"; 
 int counter=0;
 while (line!=null)
 if (line!=null)
 System.out.println("Published "+counter+" lines.");
 public static void main(String args [])
 try {
 SimpleFileProducer sfp=new SimpleFileProducer(args[0]);
 } catch (IOException ex) {
 Logger.getLogger(SimpleFileProducer.class.getName()).log(Level.SEVERE, null, ex);
 } catch (InterruptedException ex) {
 Logger.getLogger(SimpleFileProducer.class.getName()).log(Level.SEVERE, null, ex);


package producers;

import java.util.HashMap;

 * @author guys
public class Config {
// this class reads the configuration parameters from the config file and serves them to the main program
 HashMap<String, String> conf;
 public Config(String filePath) throws FileNotFoundException, IOException
 conf=new HashMap();
 File file=new File(filePath);
 BufferedReader br=new BufferedReader(new FileReader(file));
 String[] vals;
 String line=br.readLine();
 while (line!=null)
 if (!line.startsWith("#"))
 conf.put(vals[0], vals[1]); 
 public String get(String key)
 return conf.get(key.toLowerCase());

There are two classes, one is the main class and the other just reads the config file and serves the parameters to the main class.

The program takes a single argument that is a pointer to a configuration file which looks like this:

# this should point to some Kafka brokers
# The address of an active namenode

Unfortunately, this simple program has many external dependencies, both for HDFS and Kafka. To compile and run it successfully you will need all the following jars in your classpath.

Those are needed to access HDFS. You can find them in your HADOOP_HOME directory under share/hadoop. The files I list here are compatible with my Hadoop 2.7.1, if you use a different Hadoop version you may need different jar versions so you better take them directly from you Hadoop home directory.
















And those are needed for Kafka (you can download them directly from my site):

kafka-clients- (use this one if you are on Kafka 0.10)

kafka-clients- (use this one if you are on Kafka 0.9)



In order to see the throughut of the pipeline I did not implement any wait or sleep in the program loop. It just pushes all those 65,000 lines down the pipe as fast as it can. We should be done now with the producer side.

Now for the consumer side. If you need help installing Logstash, you can review my older post Using Logstash to load csv file into Elasticsearch.

We will use the logstash-input-kafka plugin. The plugin has different versions that supports different Kafka versions. There are links to the documentation of both versions a few lines down this post.

Add the logstash binaries directory to your PATH, then make sure the plugin is installed and then upgrade it:

export PATH=$PATH:/opt/logstash/bin

logstash-plugin install logstash-input-kafka
Validating logstash-input-kafka
Installing logstash-input-kafka
Installation successful

logstash-plugin update logstash-input-kafka

You are updating logstash-input-kafka to a new version 5.0.4, which may not be compatible with 2.0.8. are you sure you want to proceed (Y/N)?
Updating logstash-input-kafka
Updated logstash-input-kafka 2.0.8 to 3.0.3

Now we have to configure logstash to work with Kafka. This can be a bit tricky since the API has changed from Kafka 0.9 onward.
For Kafka the logstash config file looks like this:

input {
kafka {
consumer_threads => 1
topics => "elastic"
bootstrap_servers => ""

output {
elasticsearch {
action => "index"
hosts => ""
index => "warandpeace"
document_type => "record"
workers => 1



This is also documented here.
On my Cloudera cluster, which runs Kafka 0.9, the above configuration doesn’t work and you need the older parameters format as documented here:

input {
kafka {
consumer_threads => 5
topic_id => "elastic"
zk_connect => "cloudera1:2181"

output {
elasticsearch {
action => "index"
hosts => "es1:9200"
index => "warandpeace"
document_type => "line"
workers => 3



Note that in the old version, logstash connects to the Zookeeper hosts, while in the newer version it connects directly to Kafka brokers.

After starting lohstash as a Kafka consumer, I can run my java program:

java -jar SimpleFileProducer.jar /var/lib/kafka/kafka.conf
[main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
compression.type = none
metric.reporters = [] = 300000 = 60000 = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
bootstrap.servers = [cloudera2:9092] = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
buffer.memory = 33554432 = 30000
key.serializer = class org.apache.kafka.common.serialization.StringSerializer = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
block.on.buffer.full = false
ssl.key.password = null = 60000
sasl.kerberos.min.time.before.relogin = 60000 = 540000
ssl.truststore.password = null = 5
metrics.num.samples = 2 =
ssl.endpoint.identification.algorithm = null
ssl.protocol = TLS = 30000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
acks = 1
batch.size = 16384
ssl.keystore.location = null
receive.buffer.bytes = 32768
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = PLAINTEXT
retries = 0
max.request.size = 1048576
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
ssl.truststore.location = null
ssl.keystore.password = null
ssl.keymanager.algorithm = SunX509 = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
send.buffer.bytes = 131072 = 0

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version :
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : fc7243c2af4b2b4a
log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See for more info.
Published 65325 lines.

The program processed 65,325 lines in about 2 seconds.

Now let’s see them in Elasticsearc:

First, here is the warandpeace index as shown in ElasticHQ. Note the 65,325 documents it contains.

We can also query Elasticsearch for a random line from the book:

[[email protected] ~]# curl -XGET http://es1:9200/warandpeace/_search?pretty=true -d '{
> "query": {
> "match" : {
> "message" : "him, and again laughed his frigid laugh."
> }
> }
> }'
"took" : 83,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"failed" : 0
"hits" : {
"total" : 25625,
"max_score" : 6.0998297,
"hits" : [ {
"_index" : "warandpeace",
"_type" : "line",
"_id" : "AVblasJGjZGQI70Jsnvm",
"_score" : 6.0998297,
"_source" : {
"message" : "him, and again laughed his frigid laugh.",
"tags" : [ "_jsonparsefailure" ],
"@version" : "1",
"@timestamp" : "2016-09-01T11:03:13.789Z"
}, {
"_index" : "warandpeace",
"_type" : "line",
"_id" : "AVblatIbjZGQI70Jsn-a",
"_score" : 4.0443416,
"_source" : {
"message" : "The prince again laughed his frigid laugh.",
"tags" : [ "_jsonparsefailure" ],
"@version" : "1",
"@timestamp" : "2016-09-01T11:03:18.189Z"

There are many more results with less relevant scores, too many to list them here.

This is quite basic. There are, of course, many settings that we did not touch like compression, multythreaded producers and consumers and much more, but we now have a working pipeline between HDFS and Elasticsearch !

This entry was posted in ElasticSearch, HDFS, Kafka, Logstash and tagged , , , . Bookmark the permalink.

Leave a Reply