Spark + HBase

Last Updated: Mar 27, 2017

Spark access to Hbase

The example below demonstrates how to write data to Hbase in Spark. Note that the Hadoop cluster should be in the same security group as the HBase cluster. Otherwise the network cannot get connected. When you create a cluster in E-Mapreduce, pay attention to selecting the security group where the Hbase cluster is located.

  1. object ConnectionUtil extends Serializable {
  2. private val conf = HBaseConfiguration.create()
  3. conf.set(HConstants.ZOOKEEPER_QUORUM,"ecs1,ecs1,ecs3")
  4. conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase")
  5. private val connection = ConnectionFactory.createConnection(conf)
  6. def getDefaultConn: Connection = connection
  7. }
  8. //Create data stream unionStreams
  9. unionStreams.foreachRDD(rdd => {
  10. rdd.map(bytes => new String(bytes))
  11. .flatMap(line => line.split(" "))
  12. .map(word => (word, 1))
  13. .reduceByKey(_ + _)
  14. .mapPartitions {words => {
  15. val conn = ConnectionUtil.getDefaultConn
  16. val tableName = TableName.valueOf(tname)
  17. val t = conn.getTable(tableName)
  18. try {
  19. words.sliding(100, 100).foreach(slice => {
  20. val puts = slice.map(word => {
  21. println(s"word: $word")
  22. val put = new Put(Bytes.toBytes(word._1 + System.currentTimeMillis()))
  23. put.addColumn(COLUMN_FAMILY_BYTES, COLUMN_QUALIFIER_BYTES,
  24. System.currentTimeMillis(), Bytes.toBytes(word._2))
  25. put
  26. }).toList
  27. t.put(puts)
  28. })
  29. } finally {
  30. t.close()
  31. }
  32. Iterator.empty
  33. }}.count()
  34. })
  35. ssc.start()
  36. ssc.awaitTermination()

Appendix

For complete sample code, see:

Thank you! We've received your feedback.