Hive JDBC Architecture

Part 1: What is the actual contract that Hive provides us with?

  • Hive’s contract to users is defined in the HiveInterface class

That is – thrift is a communication channel that hive uses to expose its main service : which is the translation of SQL commands into hadoop / mapreduce commands.  The ultimate class invoked by the JDBC layer of hive is the HiveServer ~ or client ~ both of which implement the HiveInterface:

Hive JDBC always seemed like a hairy beast to me. Its actually not all that bad:  The URL mapping is translated via the JDBC driver to either use a thrift connection, or else, a “HiveServerHandler”, to dial up a HiveInterface.

The above figure illustrates how the HiveServerHandler implements the HiveInterface.  In fact, its pretty easy to run HiveCommands in pure java without even using JDBC !

  • Just creating an instance of a HiveServerHandler gives you direct access to lower level hive operations (although in an application you shouldn’t probably be instantiating this lower level object).

Nevertheless, if you’re an IDE junkie, you can then play around in your IDE with “lower level” Hive functionality by instantiating a “HiveServerHandler”.

  • This is a nice way to see the API calls that hive provides as a service to clients.
Playing with the HiveServerHandler implementation of HiveServer to see what underlying methods get called from the normal Hive interface in your IDE.

So, if you are curious about going deeper into the hive api and like to hack, calling the methods from this class manually is an interesting way to familiarize yourself with hive.

Part 2: Going deeper into the way the HiveInterface is invoked by tracing the path from JDBC-to-MapReduce. 

Now, lets take another route for understanding hive’s architecture:  Lets trace the path from JDBC to hadoop.

  • After all, we all know how JDBC works — the input is SQL, and the output is a ResultSet which is created by a “driver” which makes a database connection.
  • The Hive “Driver” has a runtime dependency on /bin/hadoop (in much the same way that in MySQL, the driver depends on a running MySQL instance
  • The Hive “Driver” allows you to create “HiveStatement” objects, which we know, are the backbone of any JDBC App.

So Lets start tracing the HiveDriver class, which is the JDBC Driver for hive.  If you don’t know the basics of JDBC, I’d suggest you read up on it before proceeding:

1) When you connect to JDBC via a URL, you explicitly define the driver:

“org.apache.hive.jdbc.HiveDriver” for hive2.

Note that it used to be, you used org.apache.hadoop.hive.jdbc.HiveDriver.

2) This driver, in turn, registers itself when it is the class is first loaded:

public class HiveDriver implements Driver {
static {
try {
java.sql.DriverManager.registerDriver(new HiveDriver());
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

3) Looking closer, the Driver also declares its URL_PREFIX, which is used in the acceptsURL implementation (im using hive2:// but for older hive, just “jdbc:hive://” was used).

private static final String URL_PREFIX = “jdbc:hive2://”;

public boolean acceptsURL(String url) throws SQLException {
return Pattern.matches(URL_PREFIX + “.*”, url);
}

4) Now – when we make a JDBC statement – the generic DriverManager calls “acceptsUrl” on all registered drivers, and if they match, it uses the matching driver in the runtime to run a query.   As we all know, at this point – we normally create a JDBC Connection to issue a query.  The Driver which is dynamically loaded above provides “public Connection connect(..)” as part of its interface, and returns a “HiveConnection” implementation:

  public Connection connect(String url, Properties info) throws SQLException {
return new HiveConnection(url, info);
}

5) The HiveConnection implementation  now has to figure out how to provide a hive service.  There are two scenarios: local and non local.   For non-local ~ i.e. a real hive server ~ a thrift communication channel is opened to talk to the hive server.  For local the HiveServerHandler is spun up:

if uri is empty:

     client = HiveServer.HiverServerHandler();

else (uri nonempty, and thus has a host/port):

     client = new HiveClient(TBinaryProtocol(new TSocket(host,port)))

Looking quickly into the HiveServerHandler class, the header describes why it is used when the URI is empty:

/**
* Handler which implements the Hive Interface This class can be used *in  lieu of the HiveClient class to get an embedded server.
*/
public static class HiveServerHandler extends HiveMetaStore.HMSHandler
implements HiveInterface {

6) At this point, the hive connection is ready to go.  The individual implementations of Hive SQL statements can be seen in the HiveConnection class, for example (from HiveConnection.java):

public PreparedStatement prepareStatement(String sql, int resultSetType,
int resultSetConcurrency) throws SQLException {
return new HivePreparedStatement(client, sql);
}

The tricky thing to get here, which explains it all, is that the “client” above is a “HiveInterface” object implmementation, which can either be an instance of HiveServer, OR of the HiveServerHandler.

7)  One last remaining question:  Where does HIVE end and MapReduce begin?    To understand that – we look directly into the HiveInterface implementations.  The are both in the HiveServer class.  (remember in 5 above – the HiveServerclass is either accessed via creation of a local handler, or via a thrift client service).   

The HiveServer implementation ultimately uses org.apache.hive.ql.Driver class.  This can be seen in the following stacktrace:

at org.apache.hadoop.hive.ql.Driver.run(Driver.java:945)
at org.apache.hadoop.hive.service.HiveServer$HiveServerHandler.execute(HiveServer.java:198)
at org.apache.hadoop.hive.jdbc.HiveStatement.executeQuery(HiveStatement.java:192)
at org.bigtop.bigpetstore.etl.HiveETL.main(HiveETL.java:105)

The application code creates a “HiveStatement”, which triggers an operation on the HiveServer, which feeds the command to the “org.apache.hadoop.hive.ql.Driver” implementation, which calls compile(), and then execute().  The execute() method has all the “magic” of chaining jobs together and running them:

///From org.apache.hadoop.hive.ql.Driver.java

// Loop while you either have tasks running, or tasks queued up
while (running.size() != 0 || runnable.peek() != null) {
// Launch upto maxthreads tasks
while (runnable.peek() != null && running.size() < maxthreads) {
Task<? extends Serializable> tsk = runnable.remove();
launchTask(tsk, queryId, noName, running, jobname, jobs, driverCxt);
}

So i guess Hive isnt such a black box after all ! 🙂 
 
The driver’s run implementation is what ultimately calls hadoop.

– Hive’s JDBC front end uses the URI to decide wether or not to implement the hive service on its own, or through a “client”.

– The client terminology is kinda funny to me : because the client itself is actually a “HiveInterface”, which is implemented via a thrift service (which talks to a running HiveServer) or else, via a embedded hive server.

– Both of these implmentations come are provided by the HiveServer class.

 

FROM HERE

How JDBC URLs get mapped to connections at runtime

Who cares?

I recently found the need to mock out a JDBC url to experiment with a new way of testing sqoop without a hard dependency on a particular database installation.  In order to do this, you first need to understand how it is that, at runtime, JDBC drivers connection call URLs get routed to implementation specific drivers.


How do JDBC URL’s get converted to implementation specific class implementations of the java.sql.Driver interface?

We’ve all used the canonical JDBC connect snippet for connecting to relational databases from the JVM.

Connection conn = null;
Statement stmt = null;
try{
      Class.forName("com.<database_vendor>.jdbc.Driver");
      conn = DriverManager.getConnection(DB_URL,USER,PASS);
      stmt = conn.createStatement();
      ... 
}
  

What is this code actually doing?

A quick look at the DriverManager class reveals that when we call “getConnection”, a data structure is scanned for available drivers which parses out the connection URL:

SQLException reason = null;
for (int i = 0; i < drivers.size(); i++) {
   DriverInfo di = (DriverInfo)drivers.elementAt(i);
   ... 
   if (di.driver.acceptsURL(url)) {
                    // Success!
                    println("getDriver returning " + di);
                    return (di.driver);
   }
}

So… how do the “drivers” get populated ?    

It turns out that the magic  call to:

Class.forName("com.mysql.jdbc.Driver")

Actually induces a static initialization block (any time a class is loaded, any static code in it gets executed).  So, the JDBC Driver implementations have an soft requirement associated with them which is that they call:

java.sql.DriverManager.registerDriver(new ClientDriverImplementation())

Where the implementation actually implements the “acceptsURL” method.  The below is an excerpt from the derby JDBC driver (similar blocks exist in com.mysql.jdbc.Driver, etc…)

static {
...
  registeredDriver__ = new ClientDriver();
  java.sql.DriverManager.registerDriver(registeredDriver__);
...
}

Thanks to the wonderful folks at stack overflow for helping to clarify the static initializer part.   http://stackoverflow.com/questions/5484227/jdbc-class-forname-vs-drivermanager-registerdriver.

In summary

So.. anyways… the moral of the story is that there is nothing magic about the database connection urls.  You could make your own database connection url as a mock connection with any kind of url text in it, as long as the class.forName(…) method was called in the beggining of your database connection workload, the java.sql.Driver class will be able to figure out what implementation class to map the connection URL to.

 

FROM HERE

Generate Large file via Hive JDBC

Recently in our project, we have a requirement of generating comparatively large file via Hive JDBC for web app user to download, by large I mean millions of rows.

Since our previous use case are all small files containing less than maybe 50000 rows, our approach is grabbing them all into memory and dump into file. However now since we have much more rows, it can easily eat up 2-4G RAM. One thing we did is using the PrintWriter which takes a FileOutputStream to append content and flush to real file on the go.

 

Another important factor is the jdbc fetch size, by default, Hive jdbc driver set the fetchSize to 50 which is way too low if we need to grab millions of rows. Once we set it to 500, the processing time decreases from 16 min to 4.5 min. And if 5000, it becomes 3min 25 sec. But when 10K, it increases to 3min 40. So we guess 5k is the sweet spot in our case.

hiveJdbcTemplate.query(sqlToExecute, rs -> {
            rs.setFetchSize(5000);
            while (rs.next()){
               ....do you handling
            }
        });

Note: As of Hive 1.3.0, the default fetchSize has been increased to 1000 based on this ticket. We are still on Hive 0.13.0, so it would affect versions like: 0.14.0, 1.0.0, 1.2.0, 1.2.1. 

I also tried to add the fetchSize=5000 as a parameter in the jdbc connection String but it seems not being honored by the driver(jdbc:hive2://{0}:{1}/{2};ssl=true;fetchSize=5000).

Here is the related source code.

 

hive jdbc with Spring Beanpropertyrowmapper

In our project we need to port some hive table data to our local RDBMS(Oracle). For tables with a lot of columns(hundreds), it could be very tedious to wrote the hive sql and convert the resultSet to the Jpa entity object.

Spring jdbctemplate provides us a good class which would do camel-case conversion to the underscore for us.  Do leverage that, we just need to make sure the hive table has the same column name as the RDBMS table. Then we just need to call:

hiveJdbcTemplate.query(sql, new BeanPropertyRowMapper&lt;&gt;(YourOwnEntity.class));

However you might find that the result size will be good but all the entity fields will be null. That is because beginning with Hive .13, a new Hive property called hive.resultset.use.unique.column.names was created. The default value of this property is TRUE.

If the default value is used, the Hive table name is prepended to all column names. This is a change in behavior from previous versions of Hive.

This change will cause the all nulls described above because Spring converts all the fields from the target entity class to underscore format(col_name1, col_name2), however when it does the comparison with the resultSet, the ‘ResultSetMetaData’ returned by hive jdbc contains header as ‘table.col_name1’, ‘table.col_name2’ etc. As a result, the fields will be all null.
To prevent the Hive table names from being prepended to the column names, use this property setting in the Hive-site.xml file.

     <property>
       <name>hive.resultset.use.unique.column.names</name>
       <value>false</value>
     </property>

EMR hive JDBC over SSL with ELB

Recently we need to setup a hive cluster consuming S3 objects so that we could run query from our java server(tomcat) via JDBC.

Several challenges:

  1. our java server is on prem(will move to aws in 2017) so we have to secure the channel to the ERM cluster in emr.
    Solution: use SSL cross the board.
  2. Every time the EMR cluster restarts, its ip changes. For the JDBC connection, we would like a constant value (DNS) name.
    Solution: use ELB to obtain a DNS name.
  3. EMR master needs to be attached to ELB every time it is created
    Solution: in the bootstrap script, we constantly pull the state of the EMR creation state and once it finishes, grab its IP and attach to ELB.

For the challenge 1

we need to install the certificate chain as well as the private key into ELB so that it could accept ssl connection from client side. In the client JDBC, we need to add ‘;ssl=true’ to the jdbc connection string to instruct client to init the connection with SSL. Another thing is to import the CA to the JRE’s lib/security cacerts so that when we do SSL handshake, the ELB’s certificate CA would be in the java client’s truststore.  For our test, we use a self-CA(finrarcselfca.crt is our self CA certificate):

sudo keytool -keystore cacerts -importcert -alias finrarcselfca -file finrarcselfca.crt

As for the connection between ELB and EMR master, we can just use a random self-signed keystore since ELB does not need to verify the ERM cluster.First generate a self-signed keystore using:

keytool -genkey -keyalg RSA -alias selfsigned -keystore self.keystore -storepass changeit -validity 360 -keysize 2048

and then add the below config to the hive-site.xml

    <property>
        <name>hive.server2.use.SSL</name>
        <value>true</value>
    </property>
    <property>
        <name>hive.server2.keystore.path</name>
        <value>/home/hadoop/hive/conf/hive2.keystore</value>
    </property>
    <property>
        <name>hive.server2.keystore.password</name>
        <value>changeit</value>
    </property>

Note: the password should match for the one when created and the one specifed in the hive site xml.

For the challenge 2

In the ELB listener, we can forward the SSL TCP 443 port to the SSL TCP 10000 port. This way, when our java client init the JDBC over SSL connection, the ELB could unwrap the message and then start another SSL connection with the EMR master via port 10000.

For the challenge 3

Previously We have following shell script to wait ERM startup and attach to ELB. The bash way is quick and dirty.
Now we are leverage nodejs with aws-js-sdk to maintain our cluster which is more robust and easier to maintain/understand.

if [ $exitStatus -eq 0 ]
then
   clusterIdLine=`echo $result| sed 's/{\|"\|:\|}\|ClusterId//g'`
   clusterId=`echo $clusterIdLine| awk '{$1=$1;print}'`
   Mail "Adhoc Query Cluster $clusterId started..."
else
   Mail "Error while creating Adhoc Query Cluster $result"
fi

sleep 30

while :
do
clusterState=`aws emr list-clusters --active |grep '"Id":\|"State":' | sed 'N;s/\n/ /' |grep "$clusterId" |sed 's/"\|,\|Name\|Id//g'|cut -d':' -f2|awk '{$1=$1;print}'`
echo $clusterId
echo $clusterState
if [ $clusterState = 'WAITING' ]
then
   break
fi
  echo 'Waiting for cluster to be created'
  sleep 30
done

masterInstanceID=`aws emr list-instances --cluster-id $clusterId --instance-group-types MASTER |grep '"Ec2InstanceId":' | sed 'N;s/\n/ /' |sed 's/"\|,\|Ec2InstanceId//g'|cut -d':' -f2|awk '{$1=$1;print}'`

echo $masterInstanceID

result=`aws elb register-instances-with-load-balancer --load-balancer-name $elbName --instances $masterInstanceID`

echo $result

Some reference

How to Create a Self Signed Certificate using Java Keytool

import private key to keystore

Cloudera hive config

jpa performance over jdbc for large table

I have a table with about 80 million records. While I was doing a simple query using JPA with 2-3 predicates. It takes about 120s to get the result, comparing the 1s using JDBC.


        CriteriaBuilder cb = entityManager.getCriteriaBuilder();
        CriteriaQuery&lt;Long&gt; cq = cb.createQuery(Long.class);
        Root&lt;SrcMpodrSalesDtlEntity&gt; root = cq.from(SrcMpodrSalesDtlEntity.class);
        List&lt;Predicate&gt; predicates = new ArrayList&lt;Predicate&gt;();
        predicates.add(cb.greaterThanOrEqualTo(root.get(SrcMpodrSalesDtlEntity_.trdDt), startDate));
        predicates.add(cb.lessThanOrEqualTo(root.get(SrcMpodrSalesDtlEntity_.trdDt), endDate));
        cq.select(cb.count(root)).where(predicates.toArray(new Predicate[predicates.size()]));
        return entityManager.createQuery(cq).getSingleResult();

Notice, i am using exactly the same query that the jpa generates.

    select
        * 
    from
        ( select
            count(srcmpodrsa0_.SRC_MPODR_SALES_DTL_ID) as totalTrade 
        from
            SRC_MPODR_SALES_DTL srcmpodrsa0_ 
        where
            srcmpodrsa0_.TRD_DT&gt;=TO_DATE('2011-07-01-00:00:00', 'yyyy-MM-dd-HH24:mi:ss') 
            and srcmpodrsa0_.TRD_DT&lt;=TO_DATE('2011-07-31-23:59:59', 'yyyy-MM-dd-HH24:mi:ss')
        ) 
    where
        rownum &lt;= 1

This is somehow frustrating.

To be honest, I have to leave JPA and stick with JDBC (but certainly using JdbcTemplate support class or such like). JPA (and other ORM providers/specifications) is not designed to operate on many objects within one transaction as they assume everything loaded should stay in first-level cache (hence the need for clear() in JPA).

Also I am recommending more low level solution because the overhead of ORM (reflection is only a tip of an iceberg) might be so significant, that iterating over plain ResultSet, even using some lightweight support like mentioned JdbcTemplate will be much faster.

JPA is simply not designed to perform operations on a large amount of entities. You might play with flush()/clear() to avoid OutOfMemoryError, but consider this once again. You gain very little paying the price of huge resource consumption.

There is no “proper” what to do this, this isn’t what JPA or JDO or any other ORM is intended to do, straight JDBC will be your best alternative, as you can configure it to bring back a small number of rows at a time and flush them as they are used, that is why server side cursors exist.

ORM tools are not designed for bulk processing, they are designed to let you manipulate objects and attempt to make the RDBMS that the data is stored in be as transparent as possible, most fail at the transparent part at least to some degree. At this scale, there is no way to process hundreds of thousands of rows ( Objects ), much less millions with any ORM and have it execute in any reasonable amount of time because of the object instantiation overhead, plain and simple.

REASON

Turns out it is because of the java.sql.Timestamp that JPA uses for the java.util.Date in the preparedStatement. And it will cause THIS!!!

The JPA has “BasicTypeRegistry” which register the sql type handling the java types. Among them, java.util.Date is regitered with TimeStampType. I found that if we use DateType, the performance will be the same as JDBC. But i did not find an easy way to do this.

configuration.getTypeResolver().registerTypeOverride( new DateType(){
            @Override
            public String[] getRegistrationKeys() {
                return new String[] {
                    getName(),
                    java.sql.Date.class.getName(),
                    java.util.Date.class.getName()
                };
            }
        } );

But since i am using JPA, i do not find a way go get the reference for hibernate Session Factory or the hibernate Configuration above.

One work around i found is annotate the field with ( @Type(type = “date”), it would use java.sql.Date ) and then using JPQL directly rather than the JPA Criteria. This way, the createQuery does not go to replace the Date time to the TimeStamp.