Serverless EMR Cluster monitoring with Lambda

Background

One issue we typically have is our EMR cluster stops consuming hive queries due to the overload of the metastore loading/refreshing. This is partially caused by the usage of the shared-metastore which hosts many teams’ schema/tables inside our organization. When this happens in prod, we have to ask help from RIM to terminate our persistent cluster and create a new one because we do not have the prod pem file. This becomes a pain for our team(preparing many emergency release stuff and getting into bridge line then waiting for all types of approval). Also for our client, they lose the time we process all the stuff we mentioned above(usually hours).

To solve this problem we created nagios alerts to ping our cluster every 10 minutes and have OPS watch for us. This is quite helpful since we know the state of the cluster all the time. However when hive server is down, we still have to go through the emergency release process. Even though we created the Jenkins pipeline to create/terminate/add-step, RIM does not allow us or OPS to use Jenkins pipeline to do the recovery.

Proposals

We have different ways to resolve it ourself:

  1. have a dedicated server(ec2) to monitoring and take actions
  2. have the monitor code deploy in launcher box and do monitoring/recovering
  3. have the code in Lambda

Dedicated EC2

Method 1 is a traditional way which requires a lot of setup with PUPPET/provision to get everything automated, which does not seem to worth.

Launcher box

Method 2 has less work because we typically have a launcher box in each env. And we could put our existing js code into a nodejs server managed by PM2. The challenge is (1, the launcher box is not a standard app server instance which is not reliable. (2. the nodejs hive connector does not currently have good support on ssl and custom authenticated that we are using in Hive server2.

More over, there is one common weak point for method 1/2, which is we could end up with another monitoring service to make sure it is up and running doing its job.

Lambda

All the above analysis brings us to the Method 3 where we use Serverless monitoring with lambda. This gives us

(1, Lower operational and development costs

(2,  smaller cost to scale

(3, Works with agile development and allows developers to focus on code and to deliver faster

(4, Fits with microservices, which can be implemented as functions.

It is not silver bullet of course. One drawback is with Lambda we could not reuse our nodejs script which is written in ES 6 and aws lambda’s node environment is 4.x. We only tested and run our script in Node 6.x env. With this, we have to re-write our cluster recovery code in java, which fortunately is not difficult thanks to the nice design of aws-java-sdk. 

Implementation

The implementation is quite straightforward. 

Components

On high level, we simulate our app path and start the connection from our elb to our cluster.  

lambda-overview

Flow

The basic flow is:

lambda-flow

The code is : https://github.com/vcfvct/lambda-emr-monitoring

Credentials

For hive password, The original thought was to pass in by Sprint boot args. Since we are running in lambda, our run arg will be plain text in the console, which is not ideal. We could also choose to use lambda’s KMS key to encrypt and then decrypt in app. Looks like it is not allowed in Org-level. After discuss with infrastructure team, our hive password will be store in credstash(dynamo).

stateful firewall with inbound outbound traffic

Background

I have worked as Devops for cloud migration in the recent 3 months without really writing much code. Even though being exposed to many AWS services like EMR/EC2/ASG(auto scaling group)/LC(launch config)/CF(cloud formation) etc.. with the need of setting up security groups(SG), i find myself still a bit confusing with inbound and outbound traffic rules. Was wondering if i allow inbound traffic, i have to send response back to client which means i have to allow outbound traffic? Did some google search with the question and get the keyword stateful firewall.

So basically with a stateful firewall, when a connection is established, the firewall will automatically let packets out back to the client’s port. You don’t need to create rules for that because the firewall knows.

History

Before the development of stateful firewalls, firewalls were stateless. A stateless firewall treats each network frame or packet individually. Such packet filters operate at the OSI Network Layer (layer 3) and function more efficiently because they only look at the header part of a packet.They do not keep track of the packet context such as the nature of the traffic. Such a firewall has no way of knowing if any given packet is part of an existing connection, is trying to establish a new connection, or is just a rogue packet. Modern firewalls are connection-aware (or state-aware), offering network administrators finer-grained control of network traffic.

Early attempts at producing firewalls operated at the application layer, which is the very top of the seven-layer OSI model. This method required exorbitant amounts of computing power and is not commonly used in modern implementations.

Description

A stateful firewall keeps track of the state of network connections (such as TCP streams or UDP communication) and is able to hold significant attributes of each connection in memory. These attributes are collectively known as the state of the connection, and may include such details as the IP addresses and ports involved in the connection and the sequence numbers of the packets traversing the connection. Stateful inspection monitors incoming and outgoing packets over time, as well as the state of the connection, and stores the data in dynamic state tables. This cumulative data is evaluated, so that filtering decisions would not only be based on administrator-defined rules, but also on context that has been built by previous connections as well as previous packets belonging to the same connection.

The most CPU intensive checking is performed at the time of setup of the connection. Entries are created only for TCP connections or UDP streams that satisfy a defined security policy. After that, all packets (for that session) are processed rapidly because it is simple and fast to determine whether it belongs to an existing, pre-screened session. Packets associated with these sessions are permitted to pass through the firewall. Sessions that do not match any policy are denied, as packets that do not match an existing table entry.

In order to prevent the state table from filling up, sessions will time out if no traffic has passed for a certain period. These stale connections are removed from the state table. Many applications therefore send keepalive messages periodically in order to stop a firewall from dropping the connection during periods of no user-activity, though some firewalls can be instructed to send these messages for applications.

Depending on the connection protocol, maintaining a connection’s state is more or less complex for the firewall. For example, TCP is inherently a stateful protocol as connections are established with a three-way handshake (“SYN, SYN-ACK, ACK”) and ended with a “FIN, ACK” exchange. This means that all packets with “SYN” in their header received by the firewall are interpreted to open new connections. If the service requested by the client is available on the server, it will respond with a “SYN-ACK” packet which the firewall will also track. Once the firewall receives the client’s “ACK” response, it transfers the connection to the “ESTABLISHED” state as the connection has been authenticated bidirectionally. This allows tracking of future packets through the established connection. Simultaneously, the firewall drops all packets which are not associated with an existing connection recorded in its state table (or “SYN” packets), preventing unsolicited connections with the protected machine by black hat hacking.

By keeping track of the connection state, stateful firewalls provide added efficiency in terms of packet inspection. This is because for existing connections the firewall need only check the state table, instead of checking the packet against the firewall’s rule set, which can be extensive. Additionally, in the case of a match with the state table, the firewall does not need to perform deep packet inspection.

Add Username password auth to Hive

In my previous post, we achieved end to end SSL encryption from client to ELB to the EMR master. Our next requirement is to add username password authentication. There are different ways in hive to do this: 1. LDAP, 2. PAM, 3. CUSTOM mode. After some evaluation we finally choosed the  CUSTOM mode way.

We first need to create a class implements the org.apache.hive.service.auth.PasswdAuthenticationProvider interface. Notice here, we use the MD5 hashed password here so that we do not expose the credential in the code.


public class RcHiveAuthentication implements PasswdAuthenticationProvider
{
    Hashtable<String, String> store = null;

    public RcHiveAuthentication()
    {
        store = new Hashtable<>();
        store.put("rcapp", "7451d491e7052ac047caf36a5272a1bf");
    }

    @Override
    public void Authenticate(String user, String password)
        throws AuthenticationException
    {
        String storedPasswd = store.get(user);
        //since MD5 is just some HEX, we ignore case.
        if (storedPasswd == null || !storedPasswd.equalsIgnoreCase(getMd5(password)))
        {
            // if we do not have the user or passowrd does not match, auth fail.
            throw new AuthenticationException("SampleAuthenticator: Error validating user");
        }
    }

    /**
     * @param input input string to be hashed
     *
     * @return  the MD5 of the input. if any error, return empty String.
     */
    public static String getMd5(String input)
    {
        String resultMd5 = "";
        try
        {
            resultMd5 = DatatypeConverter.printHexBinary(MessageDigest.getInstance("MD5").digest(input.getBytes(StandardCharsets.UTF_8)));
        } catch (NoSuchAlgorithmException e)
        {
            e.printStackTrace();
        }

        return resultMd5;
    }

    // package access for unit test
    Hashtable<String, String> getStore()
    {
        return store;
    }
}

Make the above class a jar and put it into the $HiveHOME/lib directory

Then we need to add some config to hive-site.xml so that Hive knows to use our custom provider to do authentication

    <property>
        <name>hive.server2.authentication</name>
        <value>CUSTOM</value>
    </property>
    <property>
        <name>hive.server2.custom.authentication.class</name>
        <value>org.finra.rc.hiveserver2.RcHiveAuthentication</value>
     </property>

At this stage, if you try to connect the server, you will probably get the below error:

Error while compiling statement: FAILED: RuntimeException Cannot create staging directory 'hdfs://namenode:

This is because the user we set in the custom auth provider does not have write access in the HDFS. To get around this, we need to set the hive Impersonation to false:

    <property>
        <name>hive.server2.enable.doAs</name>
        <value>false</value>
    </property>

After adding this , we now should be able to access the hive with our custom defined username/password

EMR hive JDBC over SSL with ELB

Recently we need to setup a hive cluster consuming S3 objects so that we could run query from our java server(tomcat) via JDBC.

Several challenges:

  1. our java server is on prem(will move to aws in 2017) so we have to secure the channel to the ERM cluster in emr.
    Solution: use SSL cross the board.
  2. Every time the EMR cluster restarts, its ip changes. For the JDBC connection, we would like a constant value (DNS) name.
    Solution: use ELB to obtain a DNS name.
  3. EMR master needs to be attached to ELB every time it is created
    Solution: in the bootstrap script, we constantly pull the state of the EMR creation state and once it finishes, grab its IP and attach to ELB.

For the challenge 1

we need to install the certificate chain as well as the private key into ELB so that it could accept ssl connection from client side. In the client JDBC, we need to add ‘;ssl=true’ to the jdbc connection string to instruct client to init the connection with SSL. Another thing is to import the CA to the JRE’s lib/security cacerts so that when we do SSL handshake, the ELB’s certificate CA would be in the java client’s truststore.  For our test, we use a self-CA(finrarcselfca.crt is our self CA certificate):

sudo keytool -keystore cacerts -importcert -alias finrarcselfca -file finrarcselfca.crt

As for the connection between ELB and EMR master, we can just use a random self-signed keystore since ELB does not need to verify the ERM cluster.First generate a self-signed keystore using:

keytool -genkey -keyalg RSA -alias selfsigned -keystore self.keystore -storepass changeit -validity 360 -keysize 2048

and then add the below config to the hive-site.xml

    <property>
        <name>hive.server2.use.SSL</name>
        <value>true</value>
    </property>
    <property>
        <name>hive.server2.keystore.path</name>
        <value>/home/hadoop/hive/conf/hive2.keystore</value>
    </property>
    <property>
        <name>hive.server2.keystore.password</name>
        <value>changeit</value>
    </property>

Note: the password should match for the one when created and the one specifed in the hive site xml.

For the challenge 2

In the ELB listener, we can forward the SSL TCP 443 port to the SSL TCP 10000 port. This way, when our java client init the JDBC over SSL connection, the ELB could unwrap the message and then start another SSL connection with the EMR master via port 10000.

For the challenge 3

Previously We have following shell script to wait ERM startup and attach to ELB. The bash way is quick and dirty.
Now we are leverage nodejs with aws-js-sdk to maintain our cluster which is more robust and easier to maintain/understand.

if [ $exitStatus -eq 0 ]
then
   clusterIdLine=`echo $result| sed 's/{\|"\|:\|}\|ClusterId//g'`
   clusterId=`echo $clusterIdLine| awk '{$1=$1;print}'`
   Mail "Adhoc Query Cluster $clusterId started..."
else
   Mail "Error while creating Adhoc Query Cluster $result"
fi

sleep 30

while :
do
clusterState=`aws emr list-clusters --active |grep '"Id":\|"State":' | sed 'N;s/\n/ /' |grep "$clusterId" |sed 's/"\|,\|Name\|Id//g'|cut -d':' -f2|awk '{$1=$1;print}'`
echo $clusterId
echo $clusterState
if [ $clusterState = 'WAITING' ]
then
   break
fi
  echo 'Waiting for cluster to be created'
  sleep 30
done

masterInstanceID=`aws emr list-instances --cluster-id $clusterId --instance-group-types MASTER |grep '"Ec2InstanceId":' | sed 'N;s/\n/ /' |sed 's/"\|,\|Ec2InstanceId//g'|cut -d':' -f2|awk '{$1=$1;print}'`

echo $masterInstanceID

result=`aws elb register-instances-with-load-balancer --load-balancer-name $elbName --instances $masterInstanceID`

echo $result

Some reference

How to Create a Self Signed Certificate using Java Keytool

import private key to keystore

Cloudera hive config

install fish shell in amazon ec2 instance

Switch to root

First we need to be root to install it. do
sudo su –

install using yum

go to : http://fishshell.com/files/2.1.0/linux/index.html

As of today(2016-4-14), it is:

yum-config-manager --add-repo http://fishshell.com/files/linux/RedHat_RHEL-6/fish.release:2.repo
yum -y install fish

follow the ‘Red Hat Enterprise Linux/CentOS 6‘ instruction, you should be good to go. 🙂

partition key, composite key and clustering key in Cassandra

There is a lot of confusion around this, I will try to make it as simple as possible.

The primary key is a general concept to indicate one or more columns used to retrieve data from a Table.

The primary key may be SIMPLE

 create table stackoverflow (
      key text PRIMARY KEY,
      data text      
  );

That means that it is made by a single column.

But the primary key can also be COMPOSITE (aka COMPOUND), generated from more columns.

 create table stackoverflow (
      key_part_one text,
      key_part_two int,
      data text,
      PRIMARY KEY(key_part_one, key_part_two)      
  );

In a situation of COMPOSITE primary key, the “first part” of the key is called PARTITION KEY (in this example key_part_one is the partition key) and the second part of the key is theCLUSTERING KEY (key_part_two)

Please note that the both partition and clustering key can be made by more columns

 create table stackoverflow (
      k_part_one text,
      k_part_two int,
      k_clust_one text,
      k_clust_two int,
      k_clust_three uuid,
      data text,
      PRIMARY KEY((k_part_one,k_part_two), k_clust_one, k_clust_two, k_clust_three)      
  );

Behind these names …

  • The Partition Key is responsible for data distribution accross your nodes.
  • The Clustering Key is responsible for data sorting within the partition.
  • The Primary Key is equivalent to the Partition Key in a single-field-key table.
  • The Composite/Compund Key is just a multiple-columns key

Further usage information: DATASTAX DOCUMENTATION


EDIT due to further requests
Small usage and content examples
SIMPLE KEY:

insert into stackoverflow (key, data) VALUES ('han', 'solo');
select * from stackoverflow where key='han';

table content

key | data
----+------
han | solo

COMPOSITE/COMPOUND KEY can retrieve “wide rows”

insert into stackoverflow (key_part_one, key_part_two, data) VALUES ('ronaldo', 9, 'football player');
insert into stackoverflow (key_part_one, key_part_two, data) VALUES ('ronaldo', 10, 'ex-football player');
select * from stackoverflow where key_part_one = 'ronaldo';

table content

 key_part_one | key_part_two | data
--------------+--------------+--------------------
      ronaldo |            9 |    football player
      ronaldo |           10 | ex-football player

But you can query with all key …

select * from stackoverflow where key_part_one = 'ronaldo' and key_part_two  = 10;

query output

 key_part_one | key_part_two | data
--------------+--------------+--------------------
      ronaldo |           10 | ex-football player

Important note: the partition key is the minimum-specifier needed to perform a query using where clause. If you have a composite partition key, like the following

eg: PRIMARY KEY((col1, col2), col10, col4))

You can perform query only passing at least both col1 and col2, these are the 2 columns that defines the partition key. The “general” rule to make query is you have to pass at least all partition key columns, then you can add each key in the order they’re set.

so the valid queries are (excluding secondary indexes)

  • col1 and col2
  • col1 and col2 and col10
  • col1 and col2 and col10 and col 4

invalid:

  • col1 and col2 and col4
  • anything that does not contain both col1 and col2

Here is a good video about Cassandra modeling explaining how data is stored with partition key and sorted with clustering key.

This article explains the architecture and all the detail about Cassandra including partition/replica factor/boostraping

This vimeo video detailed explained 1.ring 2.primary key(partition key) hash 3. replica mechanism etc…. Like the partition key is hashed using MD5 mapping key to 128bit number, then token is assigned to nodes which consist the cluster(token ring).  Each record will be store into node depending on its partition key hash.

This post explains how read/write requests are handled in the ring.

deploy nodejs angularjs mongodb expressjs application to openshift

In my previous post, I described how to upload file using nodejs and angularjs.

Now we are to deploy this MEAN stack app to openshift which is a very good cloud service provider offering 3 application deployment for free. You can even deploy Java web application to it using Tomcat/Mysql, part of which i mentioned in a previous post.

We first need to create a instance in openshift ,then we pull the source code to our local and merge with our local git, after fixing some conflict, we push our code to openshift. This whole process is in this preivous post.

Once we have the codebase and push it to server git, openshift will run npm towards our package.json every time we push something new there, very neat.

Some tricks needed

1. openshift ip and port

openshift has its own ip and port so we need to set it in our server.js when we start our app:

 

//openshift port or local port
var ipaddress = process.env.OPENSHIFT_NODEJS_IP || "127.0.0.1";
var port = process.env.OPENSHIFT_NODEJS_PORT ||3000;

app.listen(port, ipaddress, function () {
    logger.info('Express server listening on port: ' + port);
});

I use || so that the codebase would still run in my local. As for the variables in openshift, you can ssh to the openshift server and type:

env | grep OPENSHIFT_NODEJS

to see a bunch of environment vars set related to nodejs. IP and PORT are two of them.

2. mongo db connection

Similar to the ip and port, you also need some customization on the mongo database connection. I use mongoose. The below code would make sure our code works both locally and remotely.

 


var mongoose = require('mongoose');

// default to a 'localhost' configuration:
var connection_string = '127.0.0.1:27017/contacts';
// if OPENSHIFT env variables are present, use the available connection info:
if(process.env.OPENSHIFT_MONGODB_DB_PASSWORD){
    connection_string = process.env.OPENSHIFT_MONGODB_DB_USERNAME + ":" +
    process.env.OPENSHIFT_MONGODB_DB_PASSWORD + "@" +
    process.env.OPENSHIFT_MONGODB_DB_HOST + ':' +
    process.env.OPENSHIFT_MONGODB_DB_PORT + '/' +
    process.env.OPENSHIFT_APP_NAME;
}

mongoose.connect('mongodb://' + connection_string);
var db = mongoose.connection;
db.on('error', console.error.bind(console, 'connection error:'));

console.log('Hey, database Node module is loaded')

module.exports = db;

3. mongo db data export and import to openshift

To export the local data, just execute command

mongoexport -d targetDB -c targetCollection -o targetFileName.json

This would generate the output file in the current directory.  Next we want to import this into the openshift mongo db. The tmp directory is shorthand for /tmp. On Linux, it’s a directory that is cleaned out whenever you restart the computer, so it’s a good place for temporary files.

So, we could do something like:

$ rsync targetFileName.csv openshiftUsername@openshiftHostname:/tmp
$ ssh openshiftUsername@openshiftHostname
$ mongoimport -d targetDB -c targetCollection--type csv /tmp/targetFileName.csv --headerline

4. bower install

If you are also using bower to manage the client dependencies like me, then you also need to run bower install to get all the client packages. running it directly from the console would not work in openshift since Bower expects to be able to write some of its config files in $HOME/.bower/ (or inside .config), but OpenShift does not provide write access to $HOME.

You can work around this issue by prepending HOME=$OPENSHIFT_DATA_DIR before the bowercommand whenever you run it:

HOME=$OPENSHIFT_DATA_DIR bower

This thread posts some methods. However a easy way I found is adding this into the package.json’s scripts using its postinstall phase:

  "scripts": {
    "postinstall": "export HOME=$OPENSHIFT_REPO_DIR; ./node_modules/bower/bin/bower install"
  },

This will setup a new HOME directory – that is writable by you as well as automatically invoke bower install.