All Products
Search
Document Center

ApsaraDB for HBase:Use Spark to access an ApsaraDB for HBase Performance-enhanced Edition cluster

Last Updated:Apr 25, 2023

This topic describes how to use Spark to access an ApsaraDB for HBase Performance-enhanced Edition cluster.

Prerequisites

  • The version of an ApsaraDB for HBase Performance-enhanced Edition cluster is 2.4.3 or later. For more information about how to view or update the version of an ApsaraDB for HBase Performance-enhanced Edition cluster, see Minor version updates.

  • The IP address of a client is added to the whitelist of the ApsaraDB for HBase Performance-enhanced Edition cluster. For more information about how to add a client to the whitelist of the ApsaraDB for HBase Performance-enhanced Edition cluster, see Configure IP address allowlists and security groups.

  • The endpoint (Java API endpoint) of the ApsaraDB for HBase Performance-enhanced Edition cluster is available in the ApsaraDB for HBase console.

Usage notes

  • To access the ApsaraDB for HBase Performance-enhanced Edition cluster over the Internet, replace the open source HBase client with the ApsaraDB for HBase client before you perform the data access operation. For more information, see Upgrade ApsaraDB for HBase SDK for Java.

  • If applications are deployed on an Elastic Compute Service (ECS) instance, and you want to access the ApsaraDB for HBase Performance-enhanced Edition cluster over a virtual private cloud (VPC), make sure that the ApsaraDB for HBase Performance-enhanced Edition cluster and the ECS instance meet the following requirements to ensure network connectivity:

    • The ApsaraDB for HBase cluster and the ECS instance are deployed in the same region. We recommend that you deploy the cluster and the instance in the same zone to reduce network latency.

    • The ApsaraDB for HBase Performance-enhanced Edition cluster and the ECS instance belong to the same VPC.

Set parameters to establish a connection to your ApsaraDB for HBase Performance-enhanced Edition cluster

  • Method 1: Add access configurations to a configuration file.

    Add the following configuration items to the configuration file named hbase-site.xml:

    <configuration>
          <!--
        The Java API endpoint of the cluster. You can obtain the endpoint on the Database Connection page in the ApsaraDB for HBase console.
        -->
        <property>
            <name>hbase.zookeeper.quorum</name>
            <value>ld-bp150tns0sjxs****-proxy-hbaseue.hbaseue.rds.aliyuncs.com:30020</value>
        </property>
    </configuration>
  • Method 2: Add parameters to a Configuration object.

    // Create a Configuration object.
    Configuration conf = HBaseConfiguration.create();
    // The Java API endpoint of the cluster. You can obtain the endpoint on the Database Connection page in the ApsaraDB for HBase console.
    conf.set("hbase.zookeeper.quorum", "ld-bp150tns0sjxs****-proxy-hbaseue.hbaseue.rds.aliyuncs.com:30020");

Example

test(" test the spark sql count result") {
  // 1. Add access configurations for the HBaseue Shell.
  var conf = HBaseConfiguration.create
  conf.set("hbase.zookeeper.quorum", "ld-bp150tns0sjxs****-proxy-hbaseue.hbaseue.rds.aliyuncs.com:30020")
  // 2. Create an ApsaraDB for HBase table.
  val hbaseTableName = "testTable"
  val cf = "f"
  val column1 = cf + ":a"
  val column2 = cf + ":b"
  var rowsCount: Int = -1
  var namespace = "spark_test"
  val admin = ConnectionFactory.createConnection(conf).getAdmin()
  val tableName = TableName.valueOf(namespace, hbaseTableName)   
  val htd = new HTableDescriptor(tableName)
  htd.addFamily(new HColumnDescriptor(cf))
  admin.createTable(htd)
  // 3. Insert test data into the created table.
  val rng = new Random()
  val k: Array[Byte] = new Array[Byte](3)
  val famAndQf = KeyValue.parseColumn(Bytes.toBytes(column))
  val puts = new util.ArrayList[Put]()
  var i = 0
  for (b1 <- ('a' to 'z')) {
      for (b2 <- ('a' to 'z')) {
        for (b3 <- ('a' to 'z')) {
          if(i < 10) {
            k(0) = b1.toByte
            k(1) = b2.toByte
            k(2) = b3.toByte
            val put = new Put(k)
            put.addColumn(famAndQf(0), famAndQf(1), ("value_" + b1 + b2 + b3).getBytes())
            puts.add(put)
            i = i + 1
          }
        }
      }
  }
  val conn = ConnectionFactory.createConnection(conf)
  val table = conn.getTable(tableName)
  table.put(puts)
  // 4. Create a Spark table.
  val sparkTableName = "spark_hbase"
  val createCmd = s"""CREATE TABLE ${sparkTableName} USING org.apache.hadoop.hbase.spark
                         |    OPTIONS ('catalog'=
                         |    '{"table":{"namespace":"$${hbaseTableName}",                   "name":"${hbaseTableName}"},"rowkey":"rowkey",
                         |    "columns":{
                         |    "col0":{"cf":"rowkey", "col":"rowkey", "type":"string"},
                         |    "col1":{"cf":"cf1", "col":"a", "type":"string"},
                         |    "col2":{"cf":"cf1", "col":"b", "type":"String"}}}'
                         |    )""".stripMargin
  println(" createCmd: \n" + createCmd + " rows : " + rowsCount)
  sparkSession.sql(createCmd)
  // 5. Execute the statement that contains the COUNT() function.
  val result = sparkSession.sql("select count(*) from " + sparkTableName)
  val sparkCounts = result.collect().apply(0).getLong(0)
  println(" sparkCounts : " + sparkCounts)