Good old MapReduce was criticized in the last few years for its performance. There is a growing demand for fast, nearly real-time response to queries. MapReduce, (and hive that originally relied on it as an execution engine) is basically a batch oriented system that was not designed to give immediate answers. Running a simple query on a hive table that contains only one row can take about a minute just because of the overhead imposed by MapReduce mechanism.
Newer technologies such as Spark and Tez use in-memory processing to achieve much faster response times.
Running hive on spark as an execution engine enables us to keep our old tables and queries and get much faster answers.
Cloudera says that as of May 2016, running hive on spark in CDH5 still has issues and it is not recommended for production systems.
Before we start, I want to run a simple hive query in the old MapReduce configuration, so that we will have a performance baseline.
I have Three hive tables, each containing 15 million rows that were left from this post. I will run this query:
Select avg(value) from sampledata3; Hadoop job information for Stage-1: number of mappers: 2; number of reducers: 1 2016-06-14 17:02:33,027 Stage-1 map = 0%, reduce = 0% 2016-06-14 17:02:56,889 Stage-1 map = 35%, reduce = 0%, Cumulative CPU 15.94 sec 2016-06-14 17:02:59,514 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 19.89 sec 2016-06-14 17:03:16,940 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 21.83 sec MapReduce Total cumulative CPU time: 21 seconds 830 msec Ended Job = job_1465911520544_0001 MapReduce Jobs Launched: Stage-Stage-1: Map: 2 Reduce: 1 Cumulative CPU: 21.83 sec HDFS Read: 512895997 HDFS Write: 14 SUCCESS Total MapReduce CPU Time Spent: 21 seconds 830 msec OK 16379.1680124 Time taken: 89.79 seconds, Fetched: 1 row(s)
This query took almost 90 seconds running as a MapReduce job.
Now we will start configuring hive to run on spark.
First, we have to make sure that spark gateway role is deployed on every host running a HiveServer2 instance. If we go to clusters -> spark -> instances we can see that it is already in place (at least on my system it was deployed along with the spark parcel):
It is Ok for them to be in N/A state and they cannot be started (you will get an error if you try to start them).
Cloudera documentation suggests setting this at the hive client or beeline prompt:
This works fine, but will last only until you close your hive session, and I want the change to persist.
Cloudera documentation does not suggest any way to make the change persistent. As of version 5.7 Cloudera manager, through hive configuration, does not expose the hive.execution.engine parameter and there is no way to change it but manually.
Apache documentation here suggests several options. I tried to adopt those options in Cloudera. I added a property in the Advanced configuration snippet that is supposed to be added to hive-site.xml but hive kept using MapReduce as its engine. I added SPARK_HOME to the Advanced configuration snippet (environment) but it had no effect either. Every time I wanted to run a spark job I had to manually set hive.execution.engine=spark or the job ran as a regular MapReduce job.
Consulting Cloudera forums did not help so as a last resort I manually edited hive-site.xml on the node when I ran the query, outside Cloudera manager, and restarted hive. After that all jobs ran on spark by default as I wanted. CDH5 hive behaves in this matter just as Apache hive and it seems like Cloudera did not update Cloudera manager to support this feature yet. I guess they will include it in a later release.
Cloudera also recommends adding those two parameters to increase performance:
First, go to hive service configuration, under the HiveServer2 section and set those parameters:
Now edit the HiveServer2 Advanced configuration snippet, which actually adds parameters to hive-site.xml (but did not work for hive.execution.engine), and insert this:
<property> <name>hive.stats.fetch.column.stats</name> <value>true</value> </property> <property> <name>hive.optimize.index.filter</name> <value>true</value> </property>
After that save changes and then you will have to redeploy client configuration (requires a restart unless you have the commercial version).
Now let’s try to run the same query we ran earlier, this time on spark:
hive> select avg(value) from sampledata3; Query ID = hive_20160615135050_0fd8eb5f-6f79-4124-9593-d9bcd7a24a76 Total jobs = 1 Launching Job 1 out of 1 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer=<number> In order to limit the maximum number of reducers: set hive.exec.reducers.max=<number> In order to set a constant number of reducers: set mapreduce.job.reduces=<number> Starting Spark Job = f0731a85-5b6f-4e6f-bee1-bd022bb0879c Query Hive on Spark job stages: 2 3 Status: Running (Hive on Spark job) Job Progress Format CurrentTime StageId_StageAttemptId: SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount [StageCost] 2016-06-15 13:50:31,150 Stage-2_0: 0(+2)/2 Stage-3_0: 0/1 2016-06-15 13:50:34,197 Stage-2_0: 0(+2)/2 Stage-3_0: 0/1 2016-06-15 13:50:37,270 Stage-2_0: 0(+2)/2 Stage-3_0: 0/1 2016-06-15 13:50:39,302 Stage-2_0: 1(+1)/2 Stage-3_0: 0/1 2016-06-15 13:50:42,344 Stage-2_0: 1(+1)/2 Stage-3_0: 0/1 2016-06-15 13:50:43,355 Stage-2_0: 2/2 Finished Stage-3_0: 1/1 Finished Status: Finished successfully in 14.23 seconds OK 16379.1680124 Time taken: 15.133 seconds, Fetched: 1 row(s)
Notice that this time the job ran on spark and not on MapReduce. It finished in 15 seconds, six times faster than MapReduce !
This is nice, but far from spark claims to be 10-100 times faster, maybe it is the overhead imposed by the hive mechanism…