All Products
Search
Document Center

Use Spark to access HBase

Last Updated: Apr 07, 2021

This article mainly describes how to access HBase Enhanced Edition through Spark.

Access preparation

HBase Enhanced Edition supports access from Spark. Users need to add alihbase-connector dependencies. The specific steps are as follows:

  1. Confirm the version and installation directory of the open source hbase client that the Spark environment depends on. You can yarn logs -applicationId xxx view the version and path loaded in the specific execution log, hbase-client-xxx.jar.

  2. Confirm that you need to add the version of the alihbase-connector plug-in, and check the version correspondence table in the Download ApsaraDB for HBase SDK for Java.

  3. Download the alihbase-connector of the corresponding version through the plug-in download address in the Modify Maven dependencies to upgrade your clients of earlier versions, and put the alihbase-connector into the same directory of the open source hbase client.

Retrieve an endpoint

For more information, see Use the Java API to access an enhanced edition cluster The default port is 30020. For public access, use a public domain name.

Retrieve the username and password

The default values for both the username and password parameters are root. If you disable the Access Control List (ACL) feature on the cluster management page, the username and password are not required.

Specify connection parameters

Method 1: Modify the configuration file

Add the following configurations to hbase-site.xml:

<configuration>
      <! --
    The public endpoint or Virtual Private Cloud (VPC) internal endpoint used to connect to the cluster. You can obtain this endpoint on the Database Connection page in the console.
    -->
    <property>
        <name>hbase.zookeeper.quorum</name>
        <value>ld-xxxx-proxy-hbaseue.hbaseue.xxx.rds.aliyuncs.com:30020</value>
    </property>
    <! --
    The default values for both the username and password parameters are root. You can change the username and password based on your needs.
    -->
    <property>
        <name>hbase.client.username</name>
        <value>root</value>
    </property>
    <property>
        <name>hbase.client.password</name>
        <value>root</value>
    </property>
    <! --
    If you use the ApsaraDB for HBase client, you do not need to specify the connection.impl parameter. If your code depends on alihbase-connector, you must specify this parameter.
    -->
    <! --property>
        <name>hbase.client.connection.impl</name>
       <value>org.apache.hadoop.hbase.client.AliHBaseUEClusterConnection</value>
    </property-->
</configuration>

Method 2: Coding

Specify the related parameters of a Configuration object by running the following program.

// Create a Configuration object.
Configuration conf = HBaseConfiguration.create();
// The public endpoint or VPC internal endpoint used to connect the cluster. You can obtain this endpoint on the Database Connection page in the console.
conf.set("hbase.zookeeper.quorum", "ld-xxxx-proxy-hbaseue.hbaseue.xxx.rds.aliyuncs.com:30020");
// The default values for both the username and password parameters are root. You can change the username and the password based on your needs.
conf.set("hbase.client.username", "root")
conf.set("hbase.client.password", "root")

// If you use the ApsaraDB for HBase client, you do not need to specify the connection.impl parameter. If your code depends on alihbase-connector, you must specify this parameter.
//conf.set("hbase.client.connection.impl", AliHBaseUEClusterConnection.class.getName());
   

Spark sample code

test(" test the spark sql count result") {
  //1. Configure the connection parameters
  var conf = HBaseConfiguration.create
  conf.set("hbase.zookeeper.quorum", "ld-xxxx-proxy-hbaseue.hbaseue.xxx.rds.aliyuncs.com:30020")
  conf.set("hbase.client.username", "test_user")
  conf.set("hbase.client.password", "password")

  //2. Create a table.
  val hbaseTableName = "testTable"
  val cf = "f"
  val column1 = cf + ":a"
  val column2 = cf + ":b"
  var rowsCount: Int = -1
  var namespace = "spark_test"
  val admin = ConnectionFactory.createConnection(conf).getAdmin()
  val tableName = TableName.valueOf(namespace, hbaseTableName)   
  val htd = new HTableDescriptor(tableName)
  htd.addFamily(new HColumnDescriptor(cf))
  admin.createTable(htd)

  //3. Insert data into the table.
  val rng = new Random()
  val k: Array[Byte] = new Array[Byte](3)
  val famAndQf = KeyValue.parseColumn(Bytes.toBytes(column))
  val puts = new util.ArrayList[Put]()
  var i = 0
  for (b1 <- ('a' to 'z')) {
      for (b2 <- ('a' to 'z')) {
        for (b3 <- ('a' to 'z')) {
          if(i < 10) {
            k(0) = b1.toByte
            k(1) = b2.toByte
            k(2) = b3.toByte
            val put = new Put(k)
            put.addColumn(famAndQf(0), famAndQf(1), ("value_" + b1 + b2 + b3).getBytes())
            puts.add(put)
            i = i + 1
          }
        }
      }
  }
  val conn = ConnectionFactory.createConnection(conf)
  val table = conn.getTable(tableName)
  table.put(puts)


  //4. Create a Spark table.
  val sparkTableName = "spark_hbase"
  val createCmd = s"""CREATE TABLE ${sparkTableName} USING org.apache.hadoop.hbase.spark
                         |    OPTIONS ('catalog'=
                         |    '{"table":{"namespace":"$${hbaseTableName}",                   "name":"${hbaseTableName}"},"rowkey":"rowkey",
                         |    "columns":{
                         |    "col0":{"cf":"rowkey", "col":"rowkey", "type":"string"},
                         |    "col1":{"cf":"cf1", "col":"a", "type":"string"},
                         |    "col2":{"cf":"cf1", "col":"b", "type":"String"}}}'
                         |    )""".stripMargin

  println(" createCmd: \n" + createCmd + " rows : " + rowsCount)
  sparkSession.sql(createCmd)

  //5. Execute the SQL statement to query the number of records in the table.
  val result = sparkSession.sql("select count(*) from " + sparkTableName)
  val sparkCounts = result.collect().apply(0).getLong(0)
  println(" sparkCounts : " + sparkCounts)