All Products
Search
Document Center

Use Spark to connect to an ApsaraDB for HBase Performance-enhanced Edition instance

Last Updated: May 19, 2022

This topic describes how to use Spark to connect to an ApsaraDB for HBase Performance-enhanced Edition instance.

Preparations

To use Spark to connect to an ApsaraDB for HBase Performance-enhanced Edition instance, add the alihbase-connector dependency by performing the following steps:

  1. Check the version and installation directory of the open source HBase client that is required in the Spark environment. You can execute the yarn logs -applicationId xxx statement to view the version and installation directory of the loaded HBase client file hbase-client-xxx.jar in the operational log of the Spark job that is performed to connect to the instance.

  2. Check the version of the alihbase-connector plug-in. For more information, see Modify Maven dependencies to upgrade your clients of earlier versions.

  3. Download the alihbase-connector plug-in that corresponds to the version of your ApsaraDB for HBase instance from the URL described in the Replace JAR files to upgrade your clients of earlier versions topic. Store the downloaded plug-in package in the same directory as the open source HBase client.

Obtain the endpoint of your ApsaraDB for HBase instance

You can use the Java API of ApsaraDB for HBase to connect to an ApsaraDB for HBase Performance-enhanced Edition instance. The default port is 30020. For more information, see Use the HBase Java API to access ApsaraDB for HBase Performance-enhanced Edition instances. If you want to connect to an ApsaraDB for HBase Performance-enhanced Edition instance over the Internet, use the public endpoint.

Obtain the username and password of your ApsaraDB for HBase instance

By default, the username and the password are root. If you disable the access control list (ACL) feature on the Lindorm Insight page, the username and the password are not required.

Configure parameters to establish a connection to your ApsaraDB for HBase Performance-enhanced Edition instance

Method 1: Use the hbase-site.xml file to configure the parameters

Add the following configuration to the hbase-site.xml file:

<configuration>
      <!--
    Specify the endpoint of your ApsaraDB for HBase instance. In the ApsaraDB for HBase console, navigate to the instance details page and click Database Connection. In the Connection Information section, you can obtain the public endpoint and the private endpoint. If you want to connect to the instance over the Internet, use the public endpoint. If you want to connect to the instance over a VPC, use the private endpoint.
    -->
    <property>
        <name>hbase.zookeeper.quorum</name>
        <value>ld-xxxx-proxy-hbaseue.hbaseue.xxx.rds.aliyuncs.com:30020</value>
    </property>
    <!--
    Specify the username and the password. By default, the username and the password are root. You can change the username and the password based on your business requirements.
    -->
    <property>
        <name>hbase.client.username</name>
        <value>root</value>
    </property>
    <property>
        <name>hbase.client.password</name>
        <value>root</value>
    </property>
    <property>
        <name>hbase.client.connection.impl</name>
       <value>org.apache.hadoop.hbase.client.AliHBaseUEClusterConnection</value>
    </property-->
</configuration>

Method 2: Use program code to configure the parameters

Configure the parameters in a Configuration object by using the following code:

// Create a Configuration object.
Configuration conf = HBaseConfiguration.create();
// Specify the endpoint of your ApsaraDB for HBase instance. In the ApsaraDB for HBase console, navigate to the instance details page and click Database Connection. In the Connection Information section, you can obtain the public endpoint and the private endpoint. If you want to connect to the instance over the Internet, use the public endpoint. If you want to connect to the instance over a VPC, use the private endpoint.
conf.set("hbase.zookeeper.quorum", "ld-xxxx-proxy-hbaseue.hbaseue.xxx.rds.aliyuncs.com:30020");
// Specify the username and the password. By default, the username and the password are root. You can change the username and the password based on your business requirements.
conf.set("hbase.client.username", "root")
conf.set("hbase.client.password", "root")
conf.set("hbase.client.connection.impl", AliHBaseUEClusterConnection.class.getName());

Use Spark to connect to the instance

test(" test the spark sql count result") {
  //1. Configure parameters for the connection.
  var conf = HBaseConfiguration.create
  conf.set("hbase.zookeeper.quorum", "ld-xxxx-proxy-hbaseue.hbaseue.xxx.rds.aliyuncs.com:30020")
  conf.set("hbase.client.username", "test_user")
  conf.set("hbase.client.password", "password")
  //2. Create an ApsaraDB for HBase table.
  val hbaseTableName = "testTable"
  val cf = "f"
  val column1 = cf + ":a"
  val column2 = cf + ":b"
  var rowsCount: Int = -1
  var namespace = "spark_test"
  val admin = ConnectionFactory.createConnection(conf).getAdmin()
  val tableName = TableName.valueOf(namespace, hbaseTableName)   
  val htd = new HTableDescriptor(tableName)
  htd.addFamily(new HColumnDescriptor(cf))
  admin.createTable(htd)
  //3. Insert test data into the created table.
  val rng = new Random()
  val k: Array[Byte] = new Array[Byte](3)
  val famAndQf = KeyValue.parseColumn(Bytes.toBytes(column))
  val puts = new util.ArrayList[Put]()
  var i = 0
  for (b1 <- ('a' to 'z')) {
      for (b2 <- ('a' to 'z')) {
        for (b3 <- ('a' to 'z')) {
          if(i < 10) {
            k(0) = b1.toByte
            k(1) = b2.toByte
            k(2) = b3.toByte
            val put = new Put(k)
            put.addColumn(famAndQf(0), famAndQf(1), ("value_" + b1 + b2 + b3).getBytes())
            puts.add(put)
            i = i + 1
          }
        }
      }
  }
  val conn = ConnectionFactory.createConnection(conf)
  val table = conn.getTable(tableName)
  table.put(puts)
  //4. Create a Spark table.
  val sparkTableName = "spark_hbase"
  val createCmd = s"""CREATE TABLE ${sparkTableName} USING org.apache.hadoop.hbase.spark
                         |    OPTIONS ('catalog'=
                         |    '{"table":{"namespace":"$${hbaseTableName}",                   "name":"${hbaseTableName}"},"rowkey":"rowkey",
                         |    "columns":{
                         |    "col0":{"cf":"rowkey", "col":"rowkey", "type":"string"},
                         |    "col1":{"cf":"cf1", "col":"a", "type":"string"},
                         |    "col2":{"cf":"cf1", "col":"b", "type":"String"}}}'
                         |    )""".stripMargin
  println(" createCmd: \n" + createCmd + " rows : " + rowsCount)
  sparkSession.sql(createCmd)
  //5. Execute the statement that contains the COUNT() function.
  val result = sparkSession.sql("select count(*) from " + sparkTableName)
  val sparkCounts = result.collect().apply(0).getLong(0)
  println(" sparkCounts : " + sparkCounts)