All Products
Search
Document Center

Lindorm:Use Apache Spark to access LindormTable

Last Updated:May 16, 2023

This topic describes how to use open source Apache Spark to access LindormTable.

Prerequisites

  • The version of LindormTable is 2.4.3 or later.

  • The IP address of your client is added to the whitelist of the Lindorm instance. For more information, see Configure whitelists.

  • The LindormTable endpoint of the HBase API for Java is obtained. For more information about how to view the endpoint, see View endpoints.

Usage notes

  • To access a Lindorm instance over the Internet or the Lindorm instance that you want to access is a single-node Lindorm instance, you must upgrade your SDK and change the configurations before you perform the operations described in this topic. For more information, see Step 1 in Use the ApsaraDB for HBase API for Java to connect to and use LindormTable.

  • If your application is deployed on an Elastic Compute Service (ECS) instance, make sure that your Lindorm instance and the ECS instance meet the following requirements in advance to ensure network connectivity:

    • Your Lindorm instance and ECS instance are deployed in the same region. We recommend that you deploy the two instances in the same zone to reduce network latency.

    • Your Lindorm instance and ECS instance are deployed in the same VPC.

Add configurations for accessing LindormTable

  • Method 1: Add configurations to a configuration file

    Add the following configurations to the configuration file named hbase-site.xml:

    <configuration>
          <!--
        LindormTable endpoint of the HBase API for Java
        -->
        <property>
            <name>hbase.zookeeper.quorum</name>
            <value>ld-m5ef25p66n5es****-proxy-lindorm.lindorm.rds.aliyuncs.com:30020</value>
        </property>
    </configuration>
  • Method 2: Add parameters to a Configuration object

    // Create a Configuration object.
    Configuration conf = HBaseConfiguration.create();
    // Specify the LindormTable endpoint of the HBase API for Java.
    conf.set("hbase.zookeeper.quorum", "ld-m5ef25p66n5es****-proxy-lindorm.lindorm.rds.aliyuncs.com:30020");

Example

test(" test the spark sql count result") {
  // 1. Add access configurations for the HBaseue Shell.
  var conf = HBaseConfiguration.create
  conf.set("hbase.zookeeper.quorum", "ld-m5ef25p66n5es****-proxy-lindorm.lindorm.rds.aliyuncs.com:30020")
  // 2. Create an ApsaraDB for HBase table.
  val hbaseTableName = "testTable"
  val cf = "f"
  val column1 = cf + ":a"
  val column2 = cf + ":b"
  var rowsCount: Int = -1
  var namespace = "spark_test"
  val admin = ConnectionFactory.createConnection(conf).getAdmin()
  val tableName = TableName.valueOf(namespace, hbaseTableName)   
  val htd = new HTableDescriptor(tableName)
  htd.addFamily(new HColumnDescriptor(cf))
  admin.createTable(htd)
  // 3. Insert test data into the created table.
  val rng = new Random()
  val k: Array[Byte] = new Array[Byte](3)
  val famAndQf = KeyValue.parseColumn(Bytes.toBytes(column))
  val puts = new util.ArrayList[Put]()
  var i = 0
  for (b1 <- ('a' to 'z')) {
      for (b2 <- ('a' to 'z')) {
        for (b3 <- ('a' to 'z')) {
          if(i < 10) {
            k(0) = b1.toByte
            k(1) = b2.toByte
            k(2) = b3.toByte
            val put = new Put(k)
            put.addColumn(famAndQf(0), famAndQf(1), ("value_" + b1 + b2 + b3).getBytes())
            puts.add(put)
            i = i + 1
          }
        }
      }
  }
  val conn = ConnectionFactory.createConnection(conf)
  val table = conn.getTable(tableName)
  table.put(puts)
  // 4. Create a Spark table.
  val sparkTableName = "spark_hbase"
  val createCmd = s"""CREATE TABLE ${sparkTableName} USING org.apache.hadoop.hbase.spark
                         |    OPTIONS ('catalog'=
                         |    '{"table":{"namespace":"$${hbaseTableName}",                   "name":"${hbaseTableName}"},"rowkey":"rowkey",
                         |    "columns":{
                         |    "col0":{"cf":"rowkey", "col":"rowkey", "type":"string"},
                         |    "col1":{"cf":"cf1", "col":"a", "type":"string"},
                         |    "col2":{"cf":"cf1", "col":"b", "type":"String"}}}'
                         |    )""".stripMargin
  println(" createCmd: \n" + createCmd + " rows : " + rowsCount)
  sparkSession.sql(createCmd)
  // 5. Execute the statement that contains the COUNT() function.
  val result = sparkSession.sql("select count(*) from " + sparkTableName)
  val sparkCounts = result.collect().apply(0).getLong(0)
  println(" sparkCounts : " + sparkCounts)