Integrating ElasticSearch with Hadoop using Hive

Elasticsearch supplies a library that can connect hadoop ecosystem products to Elasticsearch and vise versa. First, we need to get the Elastic-Hadoop jar library, as shown here. To save some time and effort I already built the jar file and you can download it directly from here.

This JAR is not specific to Hive, but is used for general purpose Hadoop-Elasticsearch communication and has many other use cases.

Then we have to turn off speculative execution in the mapreduce and hive levels.

Set mapred.reduce.tasks.speculative.execution=false

in mapred-site.xml and


in hive-site.xml.

If you are using spark, also check that spark speculative execution is set to false.

All this is for Apache Hadoop users. If you are using CDH with Cloudera manager then speculative execution is off by default (Cloudera claims that they did not see any performance benefit with speculative execution).

Speculative execution’s purpose is to boost overall performance, but when writing data to an external destination like Elasticsearch, it can result in corrupted data (when competing tasks write the same data over and over), so we have to disable it.

First, we have to copy the jar file to the server where Hive is running (If it runs on more than one server, then copy it to all of them) and set its permissions so that hive user can use it.

We need to add the path where you copied the jar file so that Hive can use it. If you are using Apache Hadoop then set this in hive-site.xml:




<description> elasticsearch-hadoop-2.3.1</description>



If you are using Cloudera platform (CDH) then the best way to change the parameter across all nodes is via Cloudera manager. Go to Cluster -> hive -> configuration. Under the advanced category you will find “Hive auxiliary JARs Directory” and set it to the path where you copied the jar file.

We should now redeploy cluster configuration and restart Hive.

Another way is to add the Elastic jar file at runtime:

hive> ADD JAR file:///common/elasticsearch-hadoop-2.3.1.jar;

converting to local file:///common/elasticsearch-hadoop-2.3.1.jar

Added [/tmp/00ec0103-ec54-4a1c-b1cb-f267e471bde6_resources/elasticsearch-hadoop-2.3.1.jar] to class path

Added resources: [file:///common/elasticsearch-hadoop-2.3.1.jar]]

Now we can create a hive table that will actually write its data to an Elasticsearch index:

CREATE EXTERNAL TABLE sampledata1(line int,value bigint,message string)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.resource' = 'sampledata1/line',
'' = 'true',

The es.resource parameter specifies the index and type to create in Elasticsearch.

The es.nodes parameter specifies the location of the Elasticsearch cluster to connect to (you do not have to specify all nodes).

There are many other parameters that control mappings, ports and many more. Those parameters have default values so they are not mandatory and we did not use them here. However, in real world system you will probably have to tweak them to get the result you want. You can read more about it here.

We can look at the table structure and see that the content is serialized:

hive> describe sampledata1;


line                    int                     from deserializer

value                   bigint                  from deserializer

message                 string                  from deserializer

Time taken: 0.995 seconds, Fetched: 3 row(s)

Now loading data into our newly created table from an existing table, creates a MapReduce job that does the loading:

hive> insert into sampledata1 select * from sampledata2;

Query ID = hive_20160606143030_3f9139ff-4399-420e-99e8-35218546cd70

Total jobs = 1

Launching Job 1 out of 1

Number of reduce tasks is set to 0 since there's no reduce operator

Starting Job = job_1465204814994_0002, Tracking URL = http://cloudera1.lan:8088/proxy/application_1465204814994_0002/

Kill Command = /opt/cloudera/parcels/CDH-5.6.0-1.cdh5.6.0.p0.45/lib/hadoop/bin/hadoop job  -kill job_1465204814994_0002

Hadoop job information for Stage-0: number of mappers: 2; number of reducers: 0

2016-06-06 14:31:19,404 Stage-0 map = 0%,  reduce = 0%

2016-06-06 14:32:19,742 Stage-0 map = 0%,  reduce = 0%, Cumulative CPU 13.92 sec

2016-06-06 14:33:20,896 Stage-0 map = 0%,  reduce = 0%, Cumulative CPU 20.5 sec

2016-06-06 14:34:21,279 Stage-0 map = 0%,  reduce = 0%, Cumulative CPU 25.27 sec

Looking from the Elsticsearch end, we can see that the index sampledata1 was created and is getting populated (here is how it looks like in Elasticsearch hq):

This is very basic since we let Elasticsearch create the index for us and set the mapping automatically. In a real world scenario we might want more fine-grained control over the process.

When the indexing process is complete, we can query the data from the Elasticsearch end without problems (that’s because the data is really in Elasticsearch). As expected, the result returns in a fraction of a second:

[[email protected] ~]# curl -XGET ES1.lan:9200/sampledata1/line/_search?pretty=true -d '{
> "query": {
> "match": {
> "message": "790022"
> }
> }
> }'
"took" : 56,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"failed" : 0
"hits" : {
"total" : 1,
"max_score" : 7.609807,
"hits" : [ {
"_index" : "sampledata1",
"_type" : "line",
"_id" : "AVUlf-27fH0rDA7m61Iv",
"_score" : 7.609807,
"_source" : {
"line" : 790022,
"value" : 25844,
"message" : "Line number 790022"
} ]


Querying the same table from the Hive end is trickier and when running a query on the same document.

select line,value,message from sampledata1 where message='Line number 790022';

I had to wait 97 seconds only to get this error:

Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row [Error getting row data with exception java.lang.ClassCastException: cannot be cast to

This looks like an auto mapping problem.

Lets see the mapping in Elasticsearch:

curl -XGET 'http://es1.lan:9200/sampledata1/_mapping/line'

We see that for some reason, Elasticsearch mapped both line and value (which are int and bigint in Hive) to type long.

Looking here shows that Hive bigint type is mapped to Elasticsearch long type, which makes sense. But the same document says that int in Hive is mapped to int in Elasticsearch, but our Elasticsearch mapped it to long. Then, when querying, Hive gets back from Elasticsearch a long type when it expects int or bigint. So we have to explicitly tell Elasticsearch to store the value column as an int. Drop the table sampledata1 and recreate using this statement:

CREATE EXTERNAL TABLE sampledata1(line int,value bigint,message string)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.resource' = 'sampledata1/line',
'' = 'true',

This did not help either and Elasticsearch kept mapping the line field as long instead of integer (maybe a bug ?)

Eventually, I had to manually create the index in Elasticsearch with the correct mapping:

curl -XPOST es1.lan:9200/sampledata1 -d '{ 
 "mappings" : {
 "line" : {
 "properties" : {
 "line" : { "type" : "integer" },
 "value" :{"type" : "long" },
 "message" : {"type" : "string"}

Then I re-created the Hive table, this time with’’ = ‘false’
I populated the table and then I was able to query it also from the Hive side:

hive> select * from sampledata1 where line=790022;
Query ID = hive_20160607162020_a3b524c8-cb37-4dda-bc25-3691600ba9ba
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
Starting Job = job_1465284573117_0022, Tracking URL = http://cloudera1.lan:8088/proxy/application_1465284573117_0022/
Kill Command = /opt/cloudera/parcels/CDH-5.6.0-1.cdh5.6.0.p0.45/lib/hadoop/bin/hadoop job -kill job_1465284573117_0022
Hadoop job information for Stage-1: number of mappers: 5; number of reducers: 0
2016-06-07 16:20:21,657 Stage-1 map = 0%, reduce = 0%
2016-06-07 16:20:23,930 Stage-1 map = 20%, reduce = 0%, Cumulative CPU 2.25 sec
2016-06-07 16:20:25,112 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 2.88 sec
MapReduce Total cumulative CPU time: 2 seconds 880 msec
Ended Job = job_1465284573117_0022
MapReduce Jobs Launched:
Stage-Stage-1: Map: 5 Cumulative CPU: 2.88 sec HDFS Read: 1087015 HDFS Write: 1314596 SUCCESS
Total MapReduce CPU Time Spent: 2 seconds 880 msec
790022 25844 Line number 790022
This entry was posted in ElasticSearch, Hadoop, Hive and tagged , , . Bookmark the permalink.

One Response to Integrating ElasticSearch with Hadoop using Hive

  1. Pingback: Install ElasticSearch on Cloudera Hadoop | PLENIUM AMERICA |

Leave a Reply