make hive query faster with fetch task

There are many tasks in Hive, FetchTask is one of the most efficient tasks. It directly goes to the file and give the result rather than start a mapReduce job for the incoming query. For simple queries like select * with limit, it is very fast(single digit seconds level). In this case hive can return the results by performing an hdfs operation. hadoop fs -get, more or less.


hive> select * from t limit 1;

Time taken: 2.466 seconds


What if we modify the sql to only get one column?

hive> select id from t ;                 
Total MapReduce jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there is no reduce operator
Starting Job = job_1402248601715_0004, Tracking URL = http://cdh1:8088/proxy/application_1402248601715_0004/
Kill Command = /usr/lib/hadoop/bin/hadoop job  -kill job_1402248601715_0004
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0
2014-06-09 11:12:54,817 Stage-1 map = 0%,  reduce = 0%
2014-06-09 11:13:15,790 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.96 sec
2014-06-09 11:13:16,982 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.96 sec
MapReduce Total cumulative CPU time: 2 seconds 960 msec
Ended Job = job_1402248601715_0004
MapReduce Jobs Launched: 
Job 0: Map: 1   Cumulative CPU: 2.96 sec   HDFS Read: 257 HDFS Write: 0 SUCCESS
Total MapReduce CPU Time Spent: 2 seconds 960 msec
Time taken: 51.496 seconds

From the log, we can see the MR task is started, 1 mapper and 0 reducer. Is there anyway we can avoid the Map-Reduce above?

Yes, we need to set the hive.fetch.task.conversion:

    Some select queries can be converted to single FETCH task 
    minimizing latency.Currently the query should be single 
    sourced not having any subquery and should not have
    any aggregations or distincts (which incurrs RS), 
    lateral views and joins.
    1. minimal : SELECT STAR, FILTER on partition columns, LIMIT only
    2. more    : SELECT, FILTER, LIMIT only (+TABLESAMPLE, virtual columns)

The default is minimal, which means when executing select * with LIMIT, it will leverage FetchTask. If we set to more, then if we select certain column with limit, it will also use FetchTask. There is some other requirements: single data source(which mean one table or one partition), no subquery, no aggregation or distinct; not apply to view or JOIN. This means if we execute below query, if would still use FetchTask:

SELECT col1 as `alias1`, col2 FROM table where partitionkey=’somePartitionValue’

Let’s test it with more mode:


hive> set hive.fetch.task.conversion=more;
hive> select id from t limit 1;           
Time taken: 0.242 seconds
hive> select id from t ;                  
Time taken: 0.496 seconds

Finally if we search in the Hive source code for ‘hive.fetch.task.conversion’, we can see below code in the SimpleFetchOptimizer class:

// returns non-null FetchTask instance when succeeded
  private FetchTask optimize(ParseContext pctx, String alias, TableScanOperator source)
      throws HiveException {
    String mode = HiveConf.getVar(
        pctx.getConf(), HiveConf.ConfVars.HIVEFETCHTASKCONVERSION);

    boolean aggressive = "more".equals(mode);
    FetchData fetch = checkTree(aggressive, pctx, alias, source);
    if (fetch != null) {
      int limit = pctx.getQB().getParseInfo().getOuterQueryLimit();
      FetchWork fetchWork = fetch.convertToWork();
      FetchTask fetchTask = (FetchTask) TaskFactory.get(fetchWork, pctx.getConf());
      fetchWork.setSink(fetch.completed(pctx, fetchWork));
      return fetchTask;
    return null;

From the source code, we can see that Hive will do some optimization for the FetchTask when the hive.fetch.task.conversion is set to more


NOTE: For Hive 0.14 onward, the default value for hive.fetch.task.conversion will be more.

Another Note: there is also another param related to this: hive.fetch.task.conversion.threshold which by default in 0.10-0.13 is -1 and >0.14 is 1G(1073741824). This setting indicates if the table size is greater than the value, it will use MR rather than FetchTask to handle query. DOC.

hive> set hive.fetch.task.conversion.threshold=100000000;
hive> select * from passwords limit 1;
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
Starting Job = job_201501081639_0046, Tracking URL =
Kill Command = /opt/mapr/hadoop/hadoop-0.20.2/bin/../bin/hadoop job  -kill job_201501081639_0046
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0
2015-01-15 12:19:06,474 Stage-1 map = 0%,  reduce = 0%
2015-01-15 12:19:11,496 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 0.85 sec
MapReduce Total cumulative CPU time: 850 msec
Ended Job = job_201501081639_0046
MapReduce Jobs Launched:
Job 0: Map: 1   Cumulative CPU: 0.85 sec   MAPRFS Read: 0 MAPRFS Write: 0 SUCCESS
Total MapReduce CPU Time Spent: 850 msec
root x 0 0 root /root /bin/bash
Time taken: 6.698 seconds, Fetched: 1 row(s)

Else, it will only use fetch task:

hive> set hive.fetch.task.conversion.threshold=600000000;
hive> select * from passwords limit 1;
root x 0 0 root /root /bin/bash
Time taken: 0.325 seconds, Fetched: 1 row(s)

Note, this parameter calculates or estimates based on the table size, not the result set size.

Also some discussion on Map only job.



1.threshold for compressed data

We have different sources of data, some are txt, some are bz2.  One thing we found recently is: The Fetch threshold is based on table/partition size ,so when it is bz2 file which is an approximate 1:18 compress ratio for our data, the Fetch task will execute for a long time which might result in timeout(on our ELB).  


Another thing we noticed in our prod with compressed s3 data is when 2 or more threads are trying to access the bz2 file with fetch task, hive will throw ‘ArrayOutOfBoundException’ which is caused by underlying hadoop operation. This is a bug hopefully fixed in the hadoop 2.5 release.


Add Username password auth to Hive

In my previous post, we achieved end to end SSL encryption from client to ELB to the EMR master. Our next requirement is to add username password authentication. There are different ways in hive to do this: 1. LDAP, 2. PAM, 3. CUSTOM mode. After some evaluation we finally choosed the  CUSTOM mode way.

We first need to create a class implements the org.apache.hive.service.auth.PasswdAuthenticationProvider interface. Notice here, we use the MD5 hashed password here so that we do not expose the credential in the code.

public class RcHiveAuthentication implements PasswdAuthenticationProvider
    Hashtable<String, String> store = null;

    public RcHiveAuthentication()
        store = new Hashtable<>();
        store.put("rcapp", "7451d491e7052ac047caf36a5272a1bf");

    public void Authenticate(String user, String password)
        throws AuthenticationException
        String storedPasswd = store.get(user);
        //since MD5 is just some HEX, we ignore case.
        if (storedPasswd == null || !storedPasswd.equalsIgnoreCase(getMd5(password)))
            // if we do not have the user or passowrd does not match, auth fail.
            throw new AuthenticationException("SampleAuthenticator: Error validating user");

     * @param input input string to be hashed
     * @return  the MD5 of the input. if any error, return empty String.
    public static String getMd5(String input)
        String resultMd5 = "";
            resultMd5 = DatatypeConverter.printHexBinary(MessageDigest.getInstance("MD5").digest(input.getBytes(StandardCharsets.UTF_8)));
        } catch (NoSuchAlgorithmException e)

        return resultMd5;

    // package access for unit test
    Hashtable<String, String> getStore()
        return store;

Make the above class a jar and put it into the $HiveHOME/lib directory

Then we need to add some config to hive-site.xml so that Hive knows to use our custom provider to do authentication


At this stage, if you try to connect the server, you will probably get the below error:

Error while compiling statement: FAILED: RuntimeException Cannot create staging directory 'hdfs://namenode:

This is because the user we set in the custom auth provider does not have write access in the HDFS. To get around this, we need to set the hive Impersonation to false:


After adding this , we now should be able to access the hive with our custom defined username/password

EMR hive JDBC over SSL with ELB

Recently we need to setup a hive cluster consuming S3 objects so that we could run query from our java server(tomcat) via JDBC.

Several challenges:

  1. our java server is on prem(will move to aws in 2017) so we have to secure the channel to the ERM cluster in emr.
    Solution: use SSL cross the board.
  2. Every time the EMR cluster restarts, its ip changes. For the JDBC connection, we would like a constant value (DNS) name.
    Solution: use ELB to obtain a DNS name.
  3. EMR master needs to be attached to ELB every time it is created
    Solution: in the bootstrap script, we constantly pull the state of the EMR creation state and once it finishes, grab its IP and attach to ELB.

For the challenge 1

we need to install the certificate chain as well as the private key into ELB so that it could accept ssl connection from client side. In the client JDBC, we need to add ‘;ssl=true’ to the jdbc connection string to instruct client to init the connection with SSL. Another thing is to import the CA to the JRE’s lib/security cacerts so that when we do SSL handshake, the ELB’s certificate CA would be in the java client’s truststore.  For our test, we use a self-CA(finrarcselfca.crt is our self CA certificate):

sudo keytool -keystore cacerts -importcert -alias finrarcselfca -file finrarcselfca.crt

As for the connection between ELB and EMR master, we can just use a random self-signed keystore since ELB does not need to verify the ERM cluster.First generate a self-signed keystore using:

keytool -genkey -keyalg RSA -alias selfsigned -keystore self.keystore -storepass changeit -validity 360 -keysize 2048

and then add the below config to the hive-site.xml


Note: the password should match for the one when created and the one specifed in the hive site xml.

For the challenge 2

In the ELB listener, we can forward the SSL TCP 443 port to the SSL TCP 10000 port. This way, when our java client init the JDBC over SSL connection, the ELB could unwrap the message and then start another SSL connection with the EMR master via port 10000.

For the challenge 3

Previously We have following shell script to wait ERM startup and attach to ELB. The bash way is quick and dirty.
Now we are leverage nodejs with aws-js-sdk to maintain our cluster which is more robust and easier to maintain/understand.

if [ $exitStatus -eq 0 ]
   clusterIdLine=`echo $result| sed 's/{\|"\|:\|}\|ClusterId//g'`
   clusterId=`echo $clusterIdLine| awk '{$1=$1;print}'`
   Mail "Adhoc Query Cluster $clusterId started..."
   Mail "Error while creating Adhoc Query Cluster $result"

sleep 30

while :
clusterState=`aws emr list-clusters --active |grep '"Id":\|"State":' | sed 'N;s/\n/ /' |grep "$clusterId" |sed 's/"\|,\|Name\|Id//g'|cut -d':' -f2|awk '{$1=$1;print}'`
echo $clusterId
echo $clusterState
if [ $clusterState = 'WAITING' ]
  echo 'Waiting for cluster to be created'
  sleep 30

masterInstanceID=`aws emr list-instances --cluster-id $clusterId --instance-group-types MASTER |grep '"Ec2InstanceId":' | sed 'N;s/\n/ /' |sed 's/"\|,\|Ec2InstanceId//g'|cut -d':' -f2|awk '{$1=$1;print}'`

echo $masterInstanceID

result=`aws elb register-instances-with-load-balancer --load-balancer-name $elbName --instances $masterInstanceID`

echo $result

Some reference

How to Create a Self Signed Certificate using Java Keytool

import private key to keystore

Cloudera hive config

Hive query select as alias and format date

select as alias

In Hive, in order to use select COL as “Alias” syntax, we cannot use single or double quote. We have to use the special “backtick” symbol -> `

Example: SELECT user_name AS `system user name`, …..

date format

To format date, we need to convert the date to bigint with unix_timestamp function and then convert back using from_unixtime function.

example :

FROM_UNIXTIME(unix_timestamp(TRD_DT), 'MM/dd/yyyy') AS `Trade Date`

Comparing to all these, presto is quite straightforward, alias just need to be wrapped in the double quotes, and format date just using the DATE_FORMAT(date, format) function.


hadoop hive mapreduce presto


作者:Xiaoyu Ma


传统的文件系统是单机的,不能横跨不同的机器。HDFS(Hadoop Distributed FileSystem)的设计本质上是为了大量的数据能横跨成百上千台机器,但是你看到的是一个文件系统而不是很多文件系统。比如你说我要获取/hdfs/tmp/file1的数据,你引用的是一个文件路径,但是实际的数据存放在很多不同的机器上。你作为用户,不需要知道这些,就好比在单机上你不关心文件分散在什么磁道什么扇区一样。HDFS为你管理这些数据。

存的下数据之后,你就开始考虑怎么处理数据。虽然HDFS可以为你整体管理不同机器上的数据,但是这些数据太大了。一台机器读取成T上P的数据(很大的数据哦,比如整个东京热有史以来所有高清电影的大小甚至更大),一台机器慢慢跑也许需要好几天甚至好几周。对于很多公司来说,单机处理是不可忍受的,比如微博要更新24小时热博,它必须在24小时之内跑完这些处理。那么我如果要用很多台机器处理,我就面临了如何分配工作,如果一台机器挂了如何重新启动相应的任务,机器之间如何互相通信交换数据以完成复杂的计算等等。这就是MapReduce / Tez / Spark的功能。MapReduce是第一代计算引擎,Tez和Spark是第二代。MapReduce的设计,采用了很简化的计算模型,只有Map和Reduce两个计算过程(中间用Shuffle串联),用这个模型,已经可以处理大数据领域很大一部分问题了。
(hello, 12100次),(world,15214次)等等这样的Pair(我这里把Map和Combine放在一起说以便简化);这几百台机器各自都产生了如上的集合,然后又有几百台机器启动Reduce处理。Reducer机器A将从Mapper机器收到所有以A开头的统计结果,机器B将收到B开头的词汇统计结果(当然实际上不会真的以字母开头做依据,而是用函数产生Hash值以避免数据串化。因为类似X开头的词肯定比其他要少得多,而你不希望数据处理各个机器的工作量相差悬殊)。然后这些Reducer将再次汇总,(hello,12100)+(hello,12311)+(hello,345881)= (hello,370292)。每个Reducer都如上处理,你就得到了整个文件的词频结果。




这些系统,说实话,一直没有达到人们期望的流行度。因为这时候又两个异类被造出来了。他们是Hive on Tez / Spark和SparkSQL。它们的设计理念是,MapReduce慢,但是如果我用新一代通用计算引擎Tez或者Spark来跑SQL,那我就能跑的更快。而且用户不需要维护两套系统。这就好比如果你厨房小,人又懒,对吃的精细程度要求有限,那你可以买个电饭煲,能蒸能煲能烧,省了好多厨具。



还有一个有些独立的模块是KV Store,比如Cassandra,HBase,MongoDB以及很多很多很多很多其他的(多到无法想象)。所以KV Store就是说,我有一堆键值,我能很快速滴获取与这个Key绑定的数据。比如我用身份证号,能取到你的身份数据。这个动作用MapReduce也能完成,但是很可能要扫描整个数据集。而KV Store专用来处理这个操作,所有存和取都专门为此优化了。从几个P的数据中查找一个身份证号,也许只要零点几秒。这让大数据公司的一些专门操作被大大优化了。比如我网页上有个根据订单号查找订单内容的页面,而整个网站的订单数量无法单机数据库存储,我就会考虑用KV Store来存。KV Store的理念是,基本无法处理复杂的计算,大多没法JOIN,也许没法聚合,没有强一致性保证(不同数据分布在不同机器上,你每次读取也许会读到不同的结果,也无法处理类似银行转账那样的强一致性要求的操作)。但是丫就是快。极快。
每个不同的KV Store设计都有不同取舍,有些更快,有些容量更高,有些可以支持更复杂的操作。必有一款适合你。




一个不错的hive 数据类型
A detail video on HIVE details