Serverless EMR Cluster monitoring with Lambda


One issue we typically have is our EMR cluster stops consuming hive queries due to the overload of the metastore loading/refreshing. This is partially caused by the usage of the shared-metastore which hosts many teams’ schema/tables inside our organization. When this happens in prod, we have to ask help from RIM to terminate our persistent cluster and create a new one because we do not have the prod pem file. This becomes a pain for our team(preparing many emergency release stuff and getting into bridge line then waiting for all types of approval). Also for our client, they lose the time we process all the stuff we mentioned above(usually hours).

To solve this problem we created nagios alerts to ping our cluster every 10 minutes and have OPS watch for us. This is quite helpful since we know the state of the cluster all the time. However when hive server is down, we still have to go through the emergency release process. Even though we created the Jenkins pipeline to create/terminate/add-step, RIM does not allow us or OPS to use Jenkins pipeline to do the recovery.


We have different ways to resolve it ourself:

  1. have a dedicated server(ec2) to monitoring and take actions
  2. have the monitor code deploy in launcher box and do monitoring/recovering
  3. have the code in Lambda

Dedicated EC2

Method 1 is a traditional way which requires a lot of setup with PUPPET/provision to get everything automated, which does not seem to worth.

Launcher box

Method 2 has less work because we typically have a launcher box in each env. And we could put our existing js code into a nodejs server managed by PM2. The challenge is (1, the launcher box is not a standard app server instance which is not reliable. (2. the nodejs hive connector does not currently have good support on ssl and custom authenticated that we are using in Hive server2.

More over, there is one common weak point for method 1/2, which is we could end up with another monitoring service to make sure it is up and running doing its job.


All the above analysis brings us to the Method 3 where we use Serverless monitoring with lambda. This gives us

(1, Lower operational and development costs

(2,  smaller cost to scale

(3, Works with agile development and allows developers to focus on code and to deliver faster

(4, Fits with microservices, which can be implemented as functions.

It is not silver bullet of course. One drawback is with Lambda we could not reuse our nodejs script which is written in ES 6 and aws lambda’s node environment is 4.x. We only tested and run our script in Node 6.x env. With this, we have to re-write our cluster recovery code in java, which fortunately is not difficult thanks to the nice design of aws-java-sdk. 


The implementation is quite straightforward. 


On high level, we simulate our app path and start the connection from our elb to our cluster.  



The basic flow is:


The code is :


For hive password, The original thought was to pass in by Sprint boot args. Since we are running in lambda, our run arg will be plain text in the console, which is not ideal. We could also choose to use lambda’s KMS key to encrypt and then decrypt in app. Looks like it is not allowed in Org-level. After discuss with infrastructure team, our hive password will be store in credstash(dynamo).


large file from hive to rdbms(oracle)

Recently we have a requirement of dumping a sizable file(4+G) to oracle from s3. The file itself is hive-compatiable. so instead of downloading the file and generate sql for it, we decided to transfer the content using hive jdbc and persist in via jpa/hiberante.


On the hive side, one important thing is to make sure batchsize is set in jdbc resultset.

hiveJdbcTemplate.query(sqlToExecute, rs -> {
            while ({
      you handling


on the relational database side

  1. make sure index is turned off. otherwise it each insertion will trigger the b-tree index to be inserted.
  2. make sure leverage the hibernate batch-size
    hibernate.jdbc.batch_size. I set it to 50 since my table has over 200 columns.For example , if you save() 100 records and your hibernate.jdbc.batch_size is set to 50. During flushing, instead of issue the following SQL 100 times :

    insert into TableA (id , fields) values (1, 'val1');
    insert into TableA (id , fields) values (2, 'val2');
    insert into TableA (id , fields) values (3, 'val3');
    insert into TableA (id , fields) values (100, 'val100');

    Hiberate will group them in batches of 50 , and only issue 2 SQL to the DB, like this:

    insert into TableA (id , fields) values (1, 'val1') , (2, 'val2') ,(3, 'val3') ,(4, 'val4') ,......,(50, 'val50')
    insert into TableA (id , fields) values (51, 'val51') , (52, 'val52') ,(53, 'val53') ,(54, 'val54'),...... ,(100, 'val100')  

    Please note that Hibernate would disable insert batching at the JDBC level transparently if the primary key of the inserting table isGenerationType.Identity.

  3. make sure flush()/clear() for certain size so that memory is not eaten up by the millions of objects built on the fly.
    flush will make sure query be executed and object saved(synced) to DB.
    clear will clear the persistence context so all managed entities are detached. entities that have not been flushed will not be persisted.

My main code is something like:

    public int doImport(int limit)
        String sql = "SELECT * FROM erd.ERD_PRDCT_FIXED_INCM_MNCPL_HS_prc_txt";
        if (limit >= 0)
            sql = sql + " LIMIT " + limit;
        HiveBeanPropertyRowMapper<SrcErdFixedIncmMuniEntity> mapper = new HiveBeanPropertyRowMapper<>(SrcErdFixedIncmMuniEntity.class);
        int batchSize = 5000;
        int[] inc = {0};
        Instant start =;
        List<SrcErdFixedIncmMuniEntity> listToPersist = new ArrayList<>(batchSize);
        hiveJdbcTemplate.query(sql, (rs) -> {
            while (
                listToPersist.add(mapper.mapRow(rs, -1));
                if (inc[0] % batchSize == 0)
                    persistAndClear(inc, listToPersist);
            //left overs(last n items)
                persistAndClear(inc, listToPersist);
            return null;
        Instant end =;
        System.out.println("Data Intake took: " + Duration.between(start, end));
        return inc[0];

    private void persistAndClear(int[] inc, List<SrcErdFixedIncmMuniEntity> listToPersist)
        em.clear();"Saved record milestone: " + inc[0]);


not bad, ~3.5 Millions records get loaded in about an hour.

upgrade emr3 to 4 and hive from 0.13 to 1.0.0

We have been using emr3.9.x and Hive 0.13 for a while and pretty satisfying with it even with some known bugs. Hive 0.13 is fine, but EMR3 comes with an old version of hadoop which has an annoying bug on concurrency reading which is: when multiple thread is doing hadoop fs -get on the same file on hdfs, it would throw error. We can live with that by disabling hive fetch task and do everything in map reduce.

However recently we encounter some issue when the schema in metastore is updated(even with backward compatible change like add column to the end), we get exception on hive query:

Caused by: java.sql.SQLException: Error while processing statement: FAILED: Execution Error, return code 2 from

The thing is teams with newer version of Hive does not have this problem. This is very annoying since we are using a shared metastore and schema change is not totally controlled by our team. So we decided to upgrade our hive/EMR version. This way we also get all the security updates with the new EMR.

EMR 4/5 difference to 3.x

Path change

The first thing I hit is the Hive path all changed.

In EMR 3.x, the hive configurations and libraries are under /home/hadoop/hive/xxx.

In EMR 4/5, the conf is now on /etc/hive/conf, where the hive-site.xml should go. And the lib for jars are under /usr/lib/hive/lib, which is need when custom hive behavior is implemented like Authentication.

hive-server2 service

we used to use a pre-provided hive-init script to restart the hive-server2 service in EMR 3.x, however, in the EMR4/5, Service management is handled by upstart, and not the traditional SysVInit scripts. So we need to use upstart‘s commands to invoke jobs like:

sudo reload hive-server2

To get all the services managed: initctl list.

Still not sure why amazon is making this change, since now most of the linux distribution is using the newer systemd as init system. Even Ubuntu where upstart was initially used,  starts to use systemd in LTS version 16.0. Here is a good article comparing them, not agreeing with all of points but systemd is really the trend.

Here is 3 chinese articles for: 1.sysvinit  2.upstart  3.systemd

Bootstrap action

Bootstrap action is another major different between 3 and 4+. We used to have a lot of shell execution in EMR bootstrap action. Now EMR 4+ deprecated many of them including the hive-site.xml installation.  This is important because previously hive-site.xml is preloaded and then hive/hive-server2 is started. Now I do it as a script runner action, for whatever reason, the hive-site xml will be overwritten by the system default. So to overcome this life cycle issue, I have to defer the copy process as a step after bootstrap and then reload the hive-server2 to apply the new setting. More detail in the start shell.







Hive Hadoop mapper size

Below is an article from MapR

Hive table contains files in HDFS, if one table or one partition has too many small files, the HiveQL performance may be impacted.
Sometimes, it may take lots of time to prepare a MapReduce job before submitting it, since Hive needs to get the metadata from each file.
This article explains how to control the file numbers of hive table after inserting data on MapRFS; or simply saying, it explains how many files will be generated for “target” table by below HiveQL:


Above HiveQL may have below 2 major steps:

1. MapReduce(In this example, Map only) job to read the data from “source” table.

The number of Mappers determines the number of intermediate files, and the number of Mappers is determined by below 3 factors:

a. hive.input.format

Different input formats may start different number of Mappers in this step.
Default value in Hive 0.13 is
It will combine all files together and then try to split, so that it can improve the performance if the table has too many small files.
One old default value is which will split each file separately. Eg, If you have 10 small files and each file only has 1 row, Hive may spawn 10 mappers to read the whole table.
This article is using the default CombineHiveInputFormat as example.

b. File split size

mapred.max.split.size and mapred.min.split.size control the “target” file split size.
(In latest Hadoop 2.x, those 2 parameters are deprecated. New ones are mapreduce.input.fileinputformat.split.maxsize and mapreduce.input.fileinputformat.split.minsize).
For example, if one Hive table have one 1GB file, and the target split size is set to 100MB, 10 mappers MAY be spawned in this step. The reason of “MAY” is because of below factor c.

c. MapR-FS chunk size

Files in MapR-FS are split into chunks (similar to Hadoop blocks) that are normally 256 MB by default. Any multiple of 65,536 bytes is a valid chunk size.
The actual split size is max(target split size, chunk size).
Take above 1GB file with 100MB “target” split size for example, if the chunk size is 200MB, then the actual split size is 200MB, 5 mappers spawned; if the chunk size is 50MB, then the actual split size is 100MB, 10 mappers spawned.

Lab time:

Imagine here we have prepared 3 hive tables with the same size — 644MB and only 1 file for each table.
The only difference is the chunk size of the 3 hive tables.
source  — chunk size=8GB.
source2 — chunk size=256MB(default in mfs).
source3 — chunk size=64k(Minimum).

# hadoop mfs -ls /user/hive/warehouse/|grep -i source
drwxr-xr-x Z U   - root root          1 2014-12-04 11:22 8589934592 /user/hive/warehouse/source
drwxr-xr-x Z U   - root root          1 2014-12-04 11:31  268435456 /user/hive/warehouse/source2
drwxr-xr-x Z U   - root root          1 2014-12-04 12:24      65536 /user/hive/warehouse/source3

Then the question is how many mappers will be spawned for below INSERT, after setting target split size to 100MB?

set mapred.max.split.size=104857600;
set mapred.min.split.size=104857600;

1.  Table “source”
The whole table 644MB is in 1 chunk(8G each), so only 1 mapper.
2. Table “source2”
The whole table 644MB is in 3 chunks(256MB each), so 3 mappers.
3. Table “source3”
The whole table 644MB is in more than 10000 chunks(64KB each), and target split size(100MB) is larger than each chunk size 100MB, so 7 mappers.

Thinking accordingly, if the target split size is 10GB, for all above 3 tables, only 1 mapper will be spawned in the 1st step; If the target split size is 1MB, the mappers counts are : source(1), source2(3), source3(645).
After figuring out the 1st step, let’s move to the 2nd step.

2. Small file merge MapReduce job

After the 1st MapReduce job finishes, Hive will decide if it needs to start another MapReduce job to merge the intermediate files. If small file merge is disabled, the number of target table files is the same as the number of mappers from 1st MapReduce job. Below 4 parameters determine if and how Hive does small file merge.

  • hive.merge.mapfiles — Merge small files at the end of a map-only job.
  • hive.merge.mapredfiles — Merge small files at the end of a map-reduce job.
  • hive.merge.size.per.task — Size of merged files at the end of the job.
  • hive.merge.smallfiles.avgsize — When the average output file size of a job is less than this number, Hive will start an additional map-reduce job to merge the output files into bigger files. This is only done for map-only jobs if hive.merge.mapfiles is true, and for map-reduce jobs if hive.merge.mapredfiles is true.

By default hive.merge.smallfiles.avgsize=16000000 and hive.merge.size.per.task=256000000, so if the average file size is about 17MB, the merge job will not be triggered. Sometimes if we really want only 1 file being generated in the end, we need to increase hive.merge.smallfiles.avgsize to large enough to trigger the merge; and also you need to increase hive.merge.size.per.task to the get the needed number of files in the end.

Quiz time:

In hive 0.13 on MapR-FS with all default configurations, how many files will be generated in the end for below HiveQL? Have a guess on the file size in the end?
Reminder: chunk size for “source3” is 64KB.

set hive.merge.mapfiles=true;
set hive.merge.mapredfiles=true;
set hive.merge.smallfiles.avgsize=104857600;
set hive.merge.size.per.task=209715200;
set mapred.max.split.size=68157440;
set mapred.min.split.size=68157440;


set hive.merge.mapfiles=true;
set hive.merge.mapredfiles=true;
set hive.merge.smallfiles.avgsize=283115520;
set hive.merge.size.per.task=209715200;
set mapred.max.split.size=68157440;
set mapred.min.split.size=68157440;

1. Target split size is 65MB and chunk size is only 64KB, so 1st job will spawn 10 mappers and each mapper will generate one 65MB intermediate file.
Merge job will be triggered because average file size from previous job is less than 100MB(hive.merge.smallfiles.avgsize).
For each task, to achieve file size 200MB(hive.merge.size.per.task), 4 x 65MB files will be merged into one 260MB file.
So in the end, 3 files will be generated for target table — 644MB = 260MB+260MB+124MB.

[root@n2a warehouse]# ls -altr target
total 659725
-rwxr-xr-x  1 root root 130296036 Dec  5 17:26 000002_0
-rwxr-xr-x  1 root root 272629772 Dec  5 17:26 000001_0
drwxr-xr-x  2 root root         3 Dec  5 17:26 .
-rwxr-xr-x  1 root root 272629780 Dec  5 17:26 000000_0
drwxr-xr-x 38 mapr mapr        37 Dec  5 17:26 ..

2. Target split size is 65MB and chunk size is only 64KB, so 1st job will spawn 10 mappers and each mapper will generate one 65MB intermediate file.
Merge job will be triggered because average file size from previous job is less than 270MB(hive.merge.smallfiles.avgsize).
For each task, to achieve file size 200MB(hive.merge.size.per.task), 4 x 65MB files *should* be merged into one 260MB file. However if so, the average file size is still less than 270MB(hive.merge.smallfiles.avgsize), so they are still considered as “small files”.
In this case, 5 x 65MB files are merged into one 325MB file.
So in the end, 2 files will be generated for table table — 644MB = 325MB+319MB.

[root@n1a warehouse]# ls -altr target
total 659724
-rwxr-xr-x  1 root root 334768396 Dec  8 10:46 000001_0
drwxr-xr-x  2 root root         2 Dec  8 10:46 .
-rwxr-xr-x  1 root root 340787192 Dec  8 10:46 000000_0
drwxr-xr-x 38 mapr mapr        37 Dec  8 10:46 ..

Key takeaways:

1.  MapR-FS chunk size and target split size determine the number of mappers and the number of intermediate files.
2. Small file merge job controls the final number of files for target table.
3. Too many small files for one table may introduce job performance overhead.

Hive JDBC Architecture

Part 1: What is the actual contract that Hive provides us with?

  • Hive’s contract to users is defined in the HiveInterface class

That is – thrift is a communication channel that hive uses to expose its main service : which is the translation of SQL commands into hadoop / mapreduce commands.  The ultimate class invoked by the JDBC layer of hive is the HiveServer ~ or client ~ both of which implement the HiveInterface:

Hive JDBC always seemed like a hairy beast to me. Its actually not all that bad:  The URL mapping is translated via the JDBC driver to either use a thrift connection, or else, a “HiveServerHandler”, to dial up a HiveInterface.

The above figure illustrates how the HiveServerHandler implements the HiveInterface.  In fact, its pretty easy to run HiveCommands in pure java without even using JDBC !

  • Just creating an instance of a HiveServerHandler gives you direct access to lower level hive operations (although in an application you shouldn’t probably be instantiating this lower level object).

Nevertheless, if you’re an IDE junkie, you can then play around in your IDE with “lower level” Hive functionality by instantiating a “HiveServerHandler”.

  • This is a nice way to see the API calls that hive provides as a service to clients.
Playing with the HiveServerHandler implementation of HiveServer to see what underlying methods get called from the normal Hive interface in your IDE.

So, if you are curious about going deeper into the hive api and like to hack, calling the methods from this class manually is an interesting way to familiarize yourself with hive.

Part 2: Going deeper into the way the HiveInterface is invoked by tracing the path from JDBC-to-MapReduce. 

Now, lets take another route for understanding hive’s architecture:  Lets trace the path from JDBC to hadoop.

  • After all, we all know how JDBC works — the input is SQL, and the output is a ResultSet which is created by a “driver” which makes a database connection.
  • The Hive “Driver” has a runtime dependency on /bin/hadoop (in much the same way that in MySQL, the driver depends on a running MySQL instance
  • The Hive “Driver” allows you to create “HiveStatement” objects, which we know, are the backbone of any JDBC App.

So Lets start tracing the HiveDriver class, which is the JDBC Driver for hive.  If you don’t know the basics of JDBC, I’d suggest you read up on it before proceeding:

1) When you connect to JDBC via a URL, you explicitly define the driver:

“org.apache.hive.jdbc.HiveDriver” for hive2.

Note that it used to be, you used org.apache.hadoop.hive.jdbc.HiveDriver.

2) This driver, in turn, registers itself when it is the class is first loaded:

public class HiveDriver implements Driver {
static {
try {
java.sql.DriverManager.registerDriver(new HiveDriver());
} catch (SQLException e) {
// TODO Auto-generated catch block

3) Looking closer, the Driver also declares its URL_PREFIX, which is used in the acceptsURL implementation (im using hive2:// but for older hive, just “jdbc:hive://” was used).

private static final String URL_PREFIX = “jdbc:hive2://”;

public boolean acceptsURL(String url) throws SQLException {
return Pattern.matches(URL_PREFIX + “.*”, url);

4) Now – when we make a JDBC statement – the generic DriverManager calls “acceptsUrl” on all registered drivers, and if they match, it uses the matching driver in the runtime to run a query.   As we all know, at this point – we normally create a JDBC Connection to issue a query.  The Driver which is dynamically loaded above provides “public Connection connect(..)” as part of its interface, and returns a “HiveConnection” implementation:

  public Connection connect(String url, Properties info) throws SQLException {
return new HiveConnection(url, info);

5) The HiveConnection implementation  now has to figure out how to provide a hive service.  There are two scenarios: local and non local.   For non-local ~ i.e. a real hive server ~ a thrift communication channel is opened to talk to the hive server.  For local the HiveServerHandler is spun up:

if uri is empty:

     client = HiveServer.HiverServerHandler();

else (uri nonempty, and thus has a host/port):

     client = new HiveClient(TBinaryProtocol(new TSocket(host,port)))

Looking quickly into the HiveServerHandler class, the header describes why it is used when the URI is empty:

* Handler which implements the Hive Interface This class can be used *in  lieu of the HiveClient class to get an embedded server.
public static class HiveServerHandler extends HiveMetaStore.HMSHandler
implements HiveInterface {

6) At this point, the hive connection is ready to go.  The individual implementations of Hive SQL statements can be seen in the HiveConnection class, for example (from

public PreparedStatement prepareStatement(String sql, int resultSetType,
int resultSetConcurrency) throws SQLException {
return new HivePreparedStatement(client, sql);

The tricky thing to get here, which explains it all, is that the “client” above is a “HiveInterface” object implmementation, which can either be an instance of HiveServer, OR of the HiveServerHandler.

7)  One last remaining question:  Where does HIVE end and MapReduce begin?    To understand that – we look directly into the HiveInterface implementations.  The are both in the HiveServer class.  (remember in 5 above – the HiveServerclass is either accessed via creation of a local handler, or via a thrift client service).   

The HiveServer implementation ultimately uses org.apache.hive.ql.Driver class.  This can be seen in the following stacktrace:

at org.apache.hadoop.hive.service.HiveServer$HiveServerHandler.execute(
at org.apache.hadoop.hive.jdbc.HiveStatement.executeQuery(
at org.bigtop.bigpetstore.etl.HiveETL.main(

The application code creates a “HiveStatement”, which triggers an operation on the HiveServer, which feeds the command to the “org.apache.hadoop.hive.ql.Driver” implementation, which calls compile(), and then execute().  The execute() method has all the “magic” of chaining jobs together and running them:


// Loop while you either have tasks running, or tasks queued up
while (running.size() != 0 || runnable.peek() != null) {
// Launch upto maxthreads tasks
while (runnable.peek() != null && running.size() < maxthreads) {
Task<? extends Serializable> tsk = runnable.remove();
launchTask(tsk, queryId, noName, running, jobname, jobs, driverCxt);

So i guess Hive isnt such a black box after all ! 🙂 
The driver’s run implementation is what ultimately calls hadoop.

– Hive’s JDBC front end uses the URI to decide wether or not to implement the hive service on its own, or through a “client”.

– The client terminology is kinda funny to me : because the client itself is actually a “HiveInterface”, which is implemented via a thrift service (which talks to a running HiveServer) or else, via a embedded hive server.

– Both of these implmentations come are provided by the HiveServer class.



Generate Large file via Hive JDBC

Recently in our project, we have a requirement of generating comparatively large file via Hive JDBC for web app user to download, by large I mean millions of rows.

Since our previous use case are all small files containing less than maybe 50000 rows, our approach is grabbing them all into memory and dump into file. However now since we have much more rows, it can easily eat up 2-4G RAM. One thing we did is using the PrintWriter which takes a FileOutputStream to append content and flush to real file on the go.


Another important factor is the jdbc fetch size, by default, Hive jdbc driver set the fetchSize to 50 which is way too low if we need to grab millions of rows. Once we set it to 500, the processing time decreases from 16 min to 4.5 min. And if 5000, it becomes 3min 25 sec. But when 10K, it increases to 3min 40. So we guess 5k is the sweet spot in our case.

hiveJdbcTemplate.query(sqlToExecute, rs -> {
            while ({
      you handling

Note: As of Hive 1.3.0, the default fetchSize has been increased to 1000 based on this ticket. We are still on Hive 0.13.0, so it would affect versions like: 0.14.0, 1.0.0, 1.2.0, 1.2.1. 

I also tried to add the fetchSize=5000 as a parameter in the jdbc connection String but it seems not being honored by the driver(jdbc:hive2://{0}:{1}/{2};ssl=true;fetchSize=5000).

Here is the related source code.


hive jdbc with Spring Beanpropertyrowmapper

In our project we need to port some hive table data to our local RDBMS(Oracle). For tables with a lot of columns(hundreds), it could be very tedious to wrote the hive sql and convert the resultSet to the Jpa entity object.

Spring jdbctemplate provides us a good class which would do camel-case conversion to the underscore for us.  Do leverage that, we just need to make sure the hive table has the same column name as the RDBMS table. Then we just need to call:

hiveJdbcTemplate.query(sql, new BeanPropertyRowMapper&lt;&gt;(YourOwnEntity.class));

However you might find that the result size will be good but all the entity fields will be null. That is because beginning with Hive .13, a new Hive property called hive.resultset.use.unique.column.names was created. The default value of this property is TRUE.

If the default value is used, the Hive table name is prepended to all column names. This is a change in behavior from previous versions of Hive.

This change will cause the all nulls described above because Spring converts all the fields from the target entity class to underscore format(col_name1, col_name2), however when it does the comparison with the resultSet, the ‘ResultSetMetaData’ returned by hive jdbc contains header as ‘table.col_name1’, ‘table.col_name2’ etc. As a result, the fields will be all null.
To prevent the Hive table names from being prepended to the column names, use this property setting in the Hive-site.xml file.