This topic describes how to use Spark to connect to an ApsaraDB for HBase Performance-enhanced Edition instance.
Preparations
To use Spark to connect to an ApsaraDB for HBase Performance-enhanced Edition instance, add the alihbase-connector dependency by performing the following steps:
Check the version and installation directory of the open source HBase client that is required in the Spark environment. You can execute the
yarn logs -applicationId xxx
statement to view the version and installation directory of the loaded HBase client file hbase-client-xxx.jar in the operational log of the Spark job that is performed to connect to the instance.Check the version of the alihbase-connector plug-in. For more information, see Modify Maven dependencies to upgrade your clients of earlier versions.
Download the alihbase-connector plug-in that corresponds to the version of your ApsaraDB for HBase instance from the URL described in the Replace JAR files to upgrade your clients of earlier versions topic. Store the downloaded plug-in package in the same directory as the open source HBase client.
Obtain the endpoint of your ApsaraDB for HBase instance
You can use the Java API of ApsaraDB for HBase to connect to an ApsaraDB for HBase Performance-enhanced Edition instance. The default port is 30020. For more information, see Use the HBase Java API to access ApsaraDB for HBase Performance-enhanced Edition instances. If you want to connect to an ApsaraDB for HBase Performance-enhanced Edition instance over the Internet, use the public endpoint.
Obtain the username and password of your ApsaraDB for HBase instance
By default, the username and the password are root. If you disable the access control list (ACL) feature on the Lindorm Insight page, the username and the password are not required.
Configure parameters to establish a connection to your ApsaraDB for HBase Performance-enhanced Edition instance
Method 1: Use the hbase-site.xml file to configure the parameters
Add the following configuration to the hbase-site.xml file:
<configuration>
<!--
Specify the endpoint of your ApsaraDB for HBase instance. In the ApsaraDB for HBase console, navigate to the instance details page and click Database Connection. In the Connection Information section, you can obtain the public endpoint and the private endpoint. If you want to connect to the instance over the Internet, use the public endpoint. If you want to connect to the instance over a VPC, use the private endpoint.
-->
<property>
<name>hbase.zookeeper.quorum</name>
<value>ld-xxxx-proxy-hbaseue.hbaseue.xxx.rds.aliyuncs.com:30020</value>
</property>
<!--
Specify the username and the password. By default, the username and the password are root. You can change the username and the password based on your business requirements.
-->
<property>
<name>hbase.client.username</name>
<value>root</value>
</property>
<property>
<name>hbase.client.password</name>
<value>root</value>
</property>
<property>
<name>hbase.client.connection.impl</name>
<value>org.apache.hadoop.hbase.client.AliHBaseUEClusterConnection</value>
</property-->
</configuration>
Method 2: Use program code to configure the parameters
Configure the parameters in a Configuration object by using the following code:
// Create a Configuration object.
Configuration conf = HBaseConfiguration.create();
// Specify the endpoint of your ApsaraDB for HBase instance. In the ApsaraDB for HBase console, navigate to the instance details page and click Database Connection. In the Connection Information section, you can obtain the public endpoint and the private endpoint. If you want to connect to the instance over the Internet, use the public endpoint. If you want to connect to the instance over a VPC, use the private endpoint.
conf.set("hbase.zookeeper.quorum", "ld-xxxx-proxy-hbaseue.hbaseue.xxx.rds.aliyuncs.com:30020");
// Specify the username and the password. By default, the username and the password are root. You can change the username and the password based on your business requirements.
conf.set("hbase.client.username", "root")
conf.set("hbase.client.password", "root")
conf.set("hbase.client.connection.impl", AliHBaseUEClusterConnection.class.getName());
Use Spark to connect to the instance
test(" test the spark sql count result") {
//1. Configure parameters for the connection.
var conf = HBaseConfiguration.create
conf.set("hbase.zookeeper.quorum", "ld-xxxx-proxy-hbaseue.hbaseue.xxx.rds.aliyuncs.com:30020")
conf.set("hbase.client.username", "test_user")
conf.set("hbase.client.password", "password")
//2. Create an ApsaraDB for HBase table.
val hbaseTableName = "testTable"
val cf = "f"
val column1 = cf + ":a"
val column2 = cf + ":b"
var rowsCount: Int = -1
var namespace = "spark_test"
val admin = ConnectionFactory.createConnection(conf).getAdmin()
val tableName = TableName.valueOf(namespace, hbaseTableName)
val htd = new HTableDescriptor(tableName)
htd.addFamily(new HColumnDescriptor(cf))
admin.createTable(htd)
//3. Insert test data into the created table.
val rng = new Random()
val k: Array[Byte] = new Array[Byte](3)
val famAndQf = KeyValue.parseColumn(Bytes.toBytes(column))
val puts = new util.ArrayList[Put]()
var i = 0
for (b1 <- ('a' to 'z')) {
for (b2 <- ('a' to 'z')) {
for (b3 <- ('a' to 'z')) {
if(i < 10) {
k(0) = b1.toByte
k(1) = b2.toByte
k(2) = b3.toByte
val put = new Put(k)
put.addColumn(famAndQf(0), famAndQf(1), ("value_" + b1 + b2 + b3).getBytes())
puts.add(put)
i = i + 1
}
}
}
}
val conn = ConnectionFactory.createConnection(conf)
val table = conn.getTable(tableName)
table.put(puts)
//4. Create a Spark table.
val sparkTableName = "spark_hbase"
val createCmd = s"""CREATE TABLE ${sparkTableName} USING org.apache.hadoop.hbase.spark
| OPTIONS ('catalog'=
| '{"table":{"namespace":"$${hbaseTableName}", "name":"${hbaseTableName}"},"rowkey":"rowkey",
| "columns":{
| "col0":{"cf":"rowkey", "col":"rowkey", "type":"string"},
| "col1":{"cf":"cf1", "col":"a", "type":"string"},
| "col2":{"cf":"cf1", "col":"b", "type":"String"}}}'
| )""".stripMargin
println(" createCmd: \n" + createCmd + " rows : " + rowsCount)
sparkSession.sql(createCmd)
//5. Execute the statement that contains the COUNT() function.
val result = sparkSession.sql("select count(*) from " + sparkTableName)
val sparkCounts = result.collect().apply(0).getLong(0)
println(" sparkCounts : " + sparkCounts)