0

New in Cloudera Labs: SparkOnHBase

As we progressively move from MapReduce to Spark, we shouldn’t have to give up good HBase integration. Hence the newest Cloudera Labs project, SparkOnHBase!

[Ed. Note: In Aug. 2015, SparkOnHBase was committed to the Apache HBase trunk in the form of a new HBase-Spark module.]

Apache Spark is making a huge impact across our industry, changing the way we think about batch processing and stream processing. However, as we progressively migrate from MapReduce toward Spark, we shouldn’t have to “give up” anything. One of those capabilities we need to retain is the ability to interact with Apache HBase.

In this post, we will share the work being done in Cloudera Labs to make integrating Spark and HBase super-easy in the form of the SparkOnHBase project. (As with everything else in Cloudera Labs, SparkOnHBase is not supported and there is no timetable for possible support in the future; it’s for experimentation only.) You’ll learn common patterns of HBase integration with Spark and see Scala and Java examples for each. (It may be helpful to have the SparkOnHBase repository open as you read along.)

HBase and Batch Processing Patterns

Before we get into the coolness of Spark, let’s define some powerful usage patterns around HBase interactions with batch processing. This discussion is necessary because when I talk to many customers that are new to HBase, they tell me that they hear HBase and MapReduce should never be used together.

In fact, although there are valid use cases to have a HBase cluster that is isolated from MapReduce for low SLA reasons, there are also use cases where the combination of MapReduce and HBase is the right approach. Here are just a couple examples:

  • Massive operations on a tree/DAG/graph structures stored in HBase
  • Interaction with a store or table that is in constant change, with MapReduce or Impala

SparkOnHBase Design

We experimented with many designs for how Spark and HBase integration should work and ended up focusing on a few goals:

  • Make HBase connections seamless.
  • Make Kerberos integration seamless.
  • Create RDDs through Scan actions or from an existing RDD which are used to generate Get commands.
  • Take any RDD and allow any combination of HBase operations to be done.
  • Provide simple methods for common operations while allowing unrestricted, unknown advanced operation through the API.
  • Support Scala and Java.
  • Support Spark and Spark Streaming with a like API.

These goals led us to a design that took a couple of notes from the GraphX API in Spark. For example, in SparkOnHBase there is an object called HBaseContext. This class has a constructor that takes HBase configuration information and then once constructed, allows you to do a bunch of operations on it. For example, you can:

  • Create RDD/DStream from a Scan
  • Put/Delete the contents of a RDD/DStream into HBase
  • Create a RDD/DStream from gets created from the contents of a RDD/DStream
  • Take the contents of a RDD/DStream and do any operation if a HConnection was handed to you in the worker process

Let’s walk through a code example so you can an idea about how easy and powerful this API can be. First, we create a RDD, connect to HBase, and put the contents of that RDD into HBase.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// Nothing to see here just creating a SparkContext like you normally would
val sparkConf = new SparkConf().setAppName(“HBaseBulkPutExample “ + tableName + ” “ + columnFamily)
val sc = new SparkContext(sparkConf)
 
//This is making a RDD of
//(RowKey, columnFamily, columnQualifier, value)
val rdd = sc.parallelize(Array(
      (Bytes.toBytes(“1”), Array((Bytes.toBytes(columnFamily), Bytes.toBytes(“1”), Bytes.toBytes(“1”)))),
      (Bytes.toBytes(“2”), Array((Bytes.toBytes(columnFamily), Bytes.toBytes(“1”), Bytes.toBytes(“2”)))),
      (Bytes.toBytes(“3”), Array((Bytes.toBytes(columnFamily), Bytes.toBytes(“1”), Bytes.toBytes(“3”)))),
      (Bytes.toBytes(“4”), Array((Bytes.toBytes(columnFamily), Bytes.toBytes(“1”), Bytes.toBytes(“4”)))),
      (Bytes.toBytes(“5”), Array((Bytes.toBytes(columnFamily), Bytes.toBytes(“1”), Bytes.toBytes(“5”))))
     )
    )
 
//Create the HBase config like you normally would  then
//Pass the HBase configs and SparkContext to the HBaseContext
val conf = HBaseConfiguration.create();
conf.addResource(new Path(“/etc/hbase/conf/core-site.xml”));
conf.addResource(new Path(“/etc/hbase/conf/hbase-site.xml”));
val hbaseContext = new HBaseContext(sc, conf);
 
//Now give the rdd, table name, and a function that will convert a RDD record to a put, and finally
// A flag if you want the puts to be batched
hbaseContext.bulkPut[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd,
    tableName,
    //This function is really important because it allows our source RDD to have data of any type
    // Also because puts are not serializable
    (putRecord) > {
      val put = new Put(putRecord._1)
      putRecord._2.foreach((putValue) > put.add(putValue._1, putValue._2, putValue._3))
       put
    },
    true);

Now every partition of that RDD will execute in parallel (in different threads in a number of Spark workers across the cluster)—kind of like what would have happened if we did Puts in a mapper or reducer task.

One thing to note is that the same rules apply when working with HBase from MapReduce or Spark in terms of Put and Get performance. If you have Puts that are not partitioned, a Put batch will most likely get sent to each RegionServer, which will result in fewer records per RegionServers per batch. The image below illustrates how this would look with six RegionServers; imagine if you had 100 of them (it would be 16.7x worse)!

New in Cloudera Labs: SparkOnHBase

Now let’s look at that same diagram if we used Spark to partition first before talking to HBase.

New in Cloudera Labs: SparkOnHBase

Examples

Next, we’ll quickly explore just three code examples to illustrate how you can do different types of operations. (A Put example would look almost exactly like a delete, checkPut, checkDelete, or increment example.)

The big difference in a get example would be the fact that we are producing a new RDD from an existing one. Think of it as a “Spark map function.”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// Create some fake data
val rdd = sc.parallelize(Array(
(Bytes.toBytes(“1”)),
  
   (Bytes.toBytes(“6”)),
   (Bytes.toBytes(“7”))))
 
//Make HBaseContext
val conf = HBaseConfiguration.create()
conf.addResource(new Path(“/etc/hbase/conf/core-site.xml”))
conf.addResource(new Path(“/etc/hbase/conf/hbase-site.xml”))
 
val hbaseContext = new HBaseContext(sc, conf);
 
//This is the method we are going to focus on
val getRdd = hbaseContext.bulkGet[Array[Byte], String](
  tableName,  //The table we want to get from
  2,  //Get list batch size.  Set this somewhere under 1000
  rdd,  //RDD that hold records that will turn into Gets
  record > {    //Function that will take a given record to a Get
 
    new Get(record)
  },
  (result: Result) > {  //Function that will take a given result and return a serializable object
 
    val it = result.list().iterator()
    val b = new StringBuilder
 
    b.append(Bytes.toString(result.getRow()) + “:”)
 
    while (it.hasNext()) {
      val kv = it.next()
      val q = Bytes.toString(kv.getQualifier())
      if (q.equals(“counter”)) {
        b.append(“(“ + Bytes.toString(kv.getQualifier()) + “,” + Bytes.toLong(kv.getValue()) + “)”)
      } else {
       b.append(“(“ + Bytes.toString(kv.getQualifier()) + “,” + Bytes.toString(kv.getValue()) + “)”)
      }
    }
    b.toString
  })

Now, let’s say your interaction with HBase is more complex than straight gets or Puts—a case were you want to say, “Just give me an HConnection and leave me alone.” Well, HBaseContext has map, mapPartition, foreach, andforeachPartition methods just for you.

Here’s an example of the foreachPartition in Java.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
 
    //Create some fake data
    List list = new ArrayList();
    list.add(Bytes.toBytes(“1”));
    list.add(Bytes.toBytes(“2”));
    list.add(Bytes.toBytes(“3”));
    list.add(Bytes.toBytes(“4”));
    list.add(Bytes.toBytes(“5”));
 
    JavaRDD rdd = jsc.parallelize(list);
 
    //This foreachPartition will allow us to do anything we want with a HConnection
    // It take two parameters:
    //   – input RDD
    //   – a VoidFunction that will get a Iterator and the HConnection.  The Iterator will
    //       have all the records in this partition
    hbaseContext.foreachPartition(rdd,  new VoidFunction, HConnection>> () {
 
      public void call(Tuple2, HConnection> t)
              throws Exception {
       //We can get the table out side of the loop
        HTableInterface table1 = t._2().getTable(Bytes.toBytes(“Foo”));
 
        Iterator it = t._1();
 
        //Go through every record and getting it from HBase
        // if it isn’t there then put it there.  Not a great real world example but an example
        while (it.hasNext()) {
          byte[] b = it.next();
          Result r = table1.get(new Get(b));
          if (!r.getExists()) {
            table1.put(new Put(b));
          }
        }
        //close table outside of loop
        table1.close();
      }
    });

The last example to talk about will be the create a RDD from a scan:

1
2
3
4
5
6
7
8
9
10
11
12
val sc = new SparkContext(sparkConf)
 
val conf = HBaseConfiguration.create()
conf.addResource(new Path(“/etc/hbase/conf/core-site.xml”))
conf.addResource(new Path(“/etc/hbase/conf/hbase-site.xml”))
 
val hbaseContext = new HBaseContext(sc, conf)
 
var scan = new Scan()
scan.setCaching(100)
 
var getRdd = hbaseContext.hbaseRDD(tableName, scan)

This code will execute a scan just like MapReduce would do with the table input format and populate the resulting RDD with records of type (RowKey, List[(columnFamily, columnQualifier, Value)]. If you don’t like that record type, then just use the hbaseRDD method, which gives you a record conversion function for changing it to whatever you like.

Conclusion

SparkOnHBase has been tested on a number of clusters with Spark and Spark Streaming; give it a look and let us know your feedback via the Cloudera Labs discussion group. The hope is that this project and others like it will help us blend the goodness from different Hadoop ecosystem components to help solve bigger problems.

To use SparkOnHBase, just add the following snippet as a dependency in your pom.xml:

1
2
3
com.cloudera
sparkhbase
0.0.1clabs

Acknowledgements

Special thanks to the people that helped me make SparkOnHBase: Tathagata Das (TD), Mark Grover, Michael Stack, Sandy Ryza, Kevin O’Dell, Jean-Marc Spaggiari, Matteo Bertozzi, and Jeff Lord.

Ted Malaska is a Solutions Architect at Cloudera, a contributor to Apache Spark, and a co-author of the O’Reilly book, Hadoop Applications Architecture.

keven

Leave a Reply

Your email address will not be published. Required fields are marked *