All Products
Search
Document Center

Use Spark to connect to a Lindorm cluster

Last Updated: Jul 22, 2021

Preparations

If you use the HBase API, you can use Spark to connect to an ApsaraDB for Lindorm (Lindorm) cluster. In this case, you need only to use the HBase client provided by Alibaba Cloud. For more information, see Install and upgrade HBase SDK for Java.

Configure a Lindorm cluster

  • Method 1: Add configuration items to a configuration file

    Add the following configuration items to the configuration file hbase-site.xml:

    <configuration>
          <!--
        Enter the endpoint of your Lindorm cluster. You can obtain the value on the Database Connection page in the Lindorm console. Use a public endpoint if you connect to the Lindorm cluster over the Internet. Use an internal endpoint if you connect to the Lindorm cluster over a virtual private cloud (VPC).
        -->
        <property>
            <name>hbase.zookeeper.quorum</name>
            <value>ld-xxxx-proxy-hbaseue.hbaseue.xxx.rds.aliyuncs.com:30020</value>
        </property>
        <!--    
       Specify the username and the password. By default, the username and the password are root. You can change them as needed.
        -->
        <property>
            <name>hbase.client.username</name>
            <value>root</value>
        </property>
        <property>
            <name>hbase.client.password</name>
            <value>root</value>
        </property>
        <!--
        If you use the HBase client provided by Alibaba Cloud, you do not need to configure the connection.impl parameter. If you use the open source HBase client, you can connect to a Lindorm cluster only after alihbase-connector is installed. In this case, you must configure the connection.impl parameter.
        -->
        <!--property>
            <name>hbase.client.connection.impl</name>
           <value>org.apache.hadoop.hbase.client.AliHBaseUEClusterConnection</value>
        </property-->
    </configuration>

  • Method 2: Write code to create a configuration class

    // Create a configuration class.
    Configuration conf = HBaseConfiguration.create();
    // Enter the endpoint of your Lindorm cluster. You can obtain the value on the Database Connection page in the Lindorm console. Use a public endpoint if you connect to the Lindorm cluster over the Internet. Use an internal endpoint if you connect to the Lindorm cluster over a virtual private cloud (VPC).
    conf.set("hbase.zookeeper.quorum", "ld-xxxx-proxy-hbaseue.hbaseue.xxx.rds.aliyuncs.com:30020");
    // Specify the username and the password. By default, the username and the password are root. You can change them as needed.
    conf.set("hbase.client.username", "root")
    conf.set("hbase.client.password", "root")
    // If you use the HBase client provided by Alibaba Cloud, you do not need to configure the connection.impl parameter. If you use the open source HBase client, you can connect to a Lindorm cluster only after alihbase-connector is installed. In this case, you must configure the connection.impl parameter.
    //conf.set("hbase.client.connection.impl", AliHBaseUEClusterConnection.class.getName());

Use Spark to connect to the Lindorm cluster

Configure connection parameters for the Lindorm cluster.
test(" test the spark sql count result") {
//1. Configure connection parameters for the Lindorm cluster.
var conf = HBaseConfiguration.create
conf.set("hbase.zookeeper.quorum", "ld-xxxx-proxy-hbaseue.hbaseue.xxx.rds.aliyuncs.com:30020")
conf.set("hbase.client.username", "test_user")  
conf.set("hbase.client.password", "password")
//2. Create a table.
val hbaseTableName = "testTable"
val cf = "f"  
           val column1 = cf + ":a"  
val column2 = cf + ":b"
var rowsCount: Int = -1
var namespace = "spark_test"  
val admin = ConnectionFactory.createConnection(conf).getAdmin() 
val tableName = TableName.valueOf(namespace, hbaseTableName) 
val htd = new HTableDescriptor(tableName)  
htd.addFamily(new HColumnDescriptor(cf)) 
admin.createTable(htd)  
//3. Insert test data into the table.  
val rng = new Random()  
23. val k: Array[Byte] = new Array[Byte](3)  
24. val famAndQf = KeyValue.parseColumn(Bytes.toBytes(column)) 
25. val puts = new util.ArrayList[Put]() 
26. var i = 0  
27. for (b1 <- ('a' to 'z')) { 
28.     for (b2 <- ('a' to 'z')) {  
29.       for (b3 <- ('a' to 'z')) {       
30.          if(i < 10) {           
31.             k(0) = b1.toByte          
32.             k(1) = b2.toByte          
33.             k(2) = b3.toByte       
34.             val put = new Put(k)        
35.             put.addColumn(famAndQf(0), famAndQf(1), ("value_" + b1 + b2 + b3).getBytes())        
36.             puts.add(put)         
37.             i = i + 1       
38.           }       
39.        }     
40.       }  
41. }  
42. val conn = ConnectionFactory.createConnection(conf) 
43. val table = conn.getTable(tableName) 
44. table.put(puts) 
45.
46.
47. //4. Create a table in Spark. 
48. val sparkTableName = "spark_hbase" 
49. val createCmd = s"""CREATE TABLE ${sparkTableName} USING org.apache.hadoop.hbase.spark   
50.                        |    OPTIONS ('catalog'=               
51.                        |    '{"table":{"namespace":"$${hbaseTableName}",                   "name":"${hbaseTableName}"},"rowkey":"rowkey",            
52.                        |    "columns":{               
53.                        |    "col0":{"cf":"rowkey", "col":"rowkey", "type":"string"},               
54.                        |    "col1":{"cf":"cf1", "col":"a", "type":"string"},                  
55.                        |    "col2":{"cf":"cf1", "col":"b", "type":"String"}}}'            
56.                        |    )""".stripMargin  
57. 
58. println(" createCmd: \n" + createCmd + " rows : " + rowsCount) 
59. sparkSession.sql(createCmd) 
60.
61. //5. Use the SQL COUNT() function. 
62. val result = sparkSession.sql("select count(*) from " + sparkTableName) 
63. val sparkCounts = result.collect().apply(0).getLong(0) 
64. println(" sparkCounts : " + sparkCounts)