make hive query faster with fetch task

There are many tasks in Hive, FetchTask is one of the most efficient tasks. It directly goes to the file and give the result rather than start a mapReduce job for the incoming query. For simple queries like select * with limit, it is very fast(single digit seconds level). In this case hive can return the results by performing an hdfs operation. hadoop fs -get, more or less.

Example:

hive> select * from t limit 1;

OK
Time taken: 2.466 seconds

 

What if we modify the sql to only get one column?

hive> select id from t ;                 
Total MapReduce jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there is no reduce operator
Starting Job = job_1402248601715_0004, Tracking URL = http://cdh1:8088/proxy/application_1402248601715_0004/
Kill Command = /usr/lib/hadoop/bin/hadoop job  -kill job_1402248601715_0004
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0
2014-06-09 11:12:54,817 Stage-1 map = 0%,  reduce = 0%
2014-06-09 11:13:15,790 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.96 sec
2014-06-09 11:13:16,982 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.96 sec
MapReduce Total cumulative CPU time: 2 seconds 960 msec
Ended Job = job_1402248601715_0004
MapReduce Jobs Launched: 
Job 0: Map: 1   Cumulative CPU: 2.96 sec   HDFS Read: 257 HDFS Write: 0 SUCCESS
Total MapReduce CPU Time Spent: 2 seconds 960 msec
OK
Time taken: 51.496 seconds

From the log, we can see the MR task is started, 1 mapper and 0 reducer. Is there anyway we can avoid the Map-Reduce above?

Yes, we need to set the hive.fetch.task.conversion:

<property>
  <name>hive.fetch.task.conversion</name>
  <value>minimal</value>
  <description>
    Some select queries can be converted to single FETCH task 
    minimizing latency.Currently the query should be single 
    sourced not having any subquery and should not have
    any aggregations or distincts (which incurrs RS), 
    lateral views and joins.
    1. minimal : SELECT STAR, FILTER on partition columns, LIMIT only
    2. more    : SELECT, FILTER, LIMIT only (+TABLESAMPLE, virtual columns)
  </description>
</property>

The default is minimal, which means when executing select * with LIMIT, it will leverage FetchTask. If we set to more, then if we select certain column with limit, it will also use FetchTask. There is some other requirements: single data source(which mean one table or one partition), no subquery, no aggregation or distinct; not apply to view or JOIN. This means if we execute below query, if would still use FetchTask:

SELECT col1 as `alias1`, col2 FROM table where partitionkey=’somePartitionValue’

Let’s test it with more mode:

 

hive> set hive.fetch.task.conversion=more;
hive> select id from t limit 1;           
OK
Time taken: 0.242 seconds
hive> select id from t ;                  
OK
Time taken: 0.496 seconds

Finally if we search in the Hive source code for ‘hive.fetch.task.conversion’, we can see below code in the SimpleFetchOptimizer class:

// returns non-null FetchTask instance when succeeded
  @SuppressWarnings("unchecked")
  private FetchTask optimize(ParseContext pctx, String alias, TableScanOperator source)
      throws HiveException {
    String mode = HiveConf.getVar(
        pctx.getConf(), HiveConf.ConfVars.HIVEFETCHTASKCONVERSION);

    boolean aggressive = "more".equals(mode);
    FetchData fetch = checkTree(aggressive, pctx, alias, source);
    if (fetch != null) {
      int limit = pctx.getQB().getParseInfo().getOuterQueryLimit();
      FetchWork fetchWork = fetch.convertToWork();
      FetchTask fetchTask = (FetchTask) TaskFactory.get(fetchWork, pctx.getConf());
      fetchWork.setSink(fetch.completed(pctx, fetchWork));
      fetchWork.setSource(source);
      fetchWork.setLimit(limit);
      return fetchTask;
    }
    return null;
  }

From the source code, we can see that Hive will do some optimization for the FetchTask when the hive.fetch.task.conversion is set to more

 

NOTE: For Hive 0.14 onward, the default value for hive.fetch.task.conversion will be more.

Another Note: there is also another param related to this: hive.fetch.task.conversion.threshold which by default in 0.10-0.13 is -1 and >0.14 is 1G(1073741824). This setting indicates if the table size is greater than the value, it will use MR rather than FetchTask to handle query. DOC.

hive> set hive.fetch.task.conversion.threshold=100000000;
hive> select * from passwords limit 1;
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
Starting Job = job_201501081639_0046, Tracking URL = http://n1a.mycluster2.com:50030/jobdetails.jsp?jobid=job_201501081639_0046
Kill Command = /opt/mapr/hadoop/hadoop-0.20.2/bin/../bin/hadoop job  -kill job_201501081639_0046
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0
2015-01-15 12:19:06,474 Stage-1 map = 0%,  reduce = 0%
2015-01-15 12:19:11,496 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 0.85 sec
MapReduce Total cumulative CPU time: 850 msec
Ended Job = job_201501081639_0046
MapReduce Jobs Launched:
Job 0: Map: 1   Cumulative CPU: 0.85 sec   MAPRFS Read: 0 MAPRFS Write: 0 SUCCESS
Total MapReduce CPU Time Spent: 850 msec
OK
root x 0 0 root /root /bin/bash
Time taken: 6.698 seconds, Fetched: 1 row(s)

Else, it will only use fetch task:

hive> set hive.fetch.task.conversion.threshold=600000000;
hive> select * from passwords limit 1;
OK
root x 0 0 root /root /bin/bash
Time taken: 0.325 seconds, Fetched: 1 row(s)

Note, this parameter calculates or estimates based on the table size, not the result set size.

Also some discussion on Map only job.

 

Caveat

1.threshold for compressed data

We have different sources of data, some are txt, some are bz2.  One thing we found recently is: The Fetch threshold is based on table/partition size ,so when it is bz2 file which is an approximate 1:18 compress ratio for our data, the Fetch task will execute for a long time which might result in timeout(on our ELB).  

2.concurrency

Another thing we noticed in our prod with compressed s3 data is when 2 or more threads are trying to access the bz2 file with fetch task, hive will throw ‘ArrayOutOfBoundException’ which is caused by underlying hadoop operation. This is a bug hopefully fixed in the hadoop 2.5 release. 

https://issues.apache.org/jira/browse/HADOOP-10614

One Comment Add yours

Leave a comment