This topic describes how to use Spark to write data to HBase.

Example of using Spark to access HBase

Important Your computing cluster must be in the same security group as your HBase cluster. Otherwise, the clusters cannot be connected. When you create a computing cluster in the EMR console, you must select the security group where the HBase cluster is located.
  • Java code
    JavaSparkContext jsc = new JavaSparkContext(sparkConf);
    
    try {
      List<byte[]> list = new ArrayList<>();
      list.add(Bytes.toBytes("1"));
      ...
      list.add(Bytes.toBytes("5"));
    
      JavaRDD<byte[]> rdd = jsc.parallelize(list);
      Configuration conf = HBaseConfiguration.create();
    
      JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
    
      hbaseContext.foreachPartition(rdd,
          new VoidFunction<Tuple2<Iterator<byte[]>, Connection>>() {
       public void call(Tuple2<Iterator<byte[]>, Connection> t)
            throws Exception {
        Table table = t._2().getTable(TableName.valueOf(tableName));
        BufferedMutator mutator = t._2().getBufferedMutator(TableName.valueOf(tableName));
        while (t._1().hasNext()) {
          byte[] b = t._1().next();
          Result r = table.get(new Get(b));
          if (r.getExists()) {
           mutator.mutate(new Put(b));
          }
        }
    
        mutator.flush();
        mutator.close();
        table.close();
       }
      });
    } finally {
      jsc.stop();
    }
  • Scala code
    val sc = new SparkContext("local", "test")
    val config = new HBaseConfiguration()
    
    ...
    
    val hbaseContext = new HBaseContext(sc, config)
    
    rdd.hbaseForeachPartition(hbaseContext, (it, conn) => {
      val bufferedMutator = conn.getBufferedMutator(TableName.valueOf("t1"))
      it.foreach((putRecord) => {
       . val put = new Put(putRecord._1)
       . putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
       . bufferedMutator.mutate(put)
     })
      bufferedMutator.flush()
      bufferedMutator.close()
    })

References