Apache Spark can query and analyze data stored in LindormTable through the HBase connector. This guide walks you through configuring the connection, mapping your HBase table schema to a Spark table, and running Spark SQL queries.
Prerequisites
Before you begin, ensure that you have:
-
LindormTable version 2.4.3 or later
-
Your client IP address added to the Lindorm instance whitelist. See Configure whitelists
-
The LindormTable endpoint of the HBase API for Java. See View endpoints
Usage notes
-
To access a Lindorm instance over the Internet, or if your Lindorm instance is a single-node instance, upgrade your SDK and update your configurations before proceeding. See Step 1 in Use the ApsaraDB for HBase API for Java to connect to and use LindormTable.
-
If your application runs on an Elastic Compute Service (ECS) instance, make sure that:
-
Your Lindorm instance and ECS instance are in the same region. Deploy them in the same zone to reduce network latency.
-
Your Lindorm instance and ECS instance are in the same VPC.
-
Overall process
The overall process for connecting Spark to LindormTable is:
-
Configure the LindormTable connection endpoint.
-
Create an HBase table and insert data.
-
Define a catalog to map HBase columns to a Spark table.
-
Create the Spark table using the catalog.
-
Run Spark SQL queries against the Spark table.
Configure the connection
Set hbase.zookeeper.quorum to the LindormTable endpoint of the HBase API for Java. Use either of the following methods:
Method 1: Configuration file
Add the following to hbase-site.xml:
<configuration>
<!-- LindormTable endpoint of the HBase API for Java -->
<property>
<name>hbase.zookeeper.quorum</name>
<value>ld-m5ef25p66n5es****-proxy-lindorm.lindorm.rds.aliyuncs.com:30020</value>
</property>
</configuration>
Method 2: Configuration object
// Create a Configuration object.
val conf = HBaseConfiguration.create()
// Specify the LindormTable endpoint of the HBase API for Java.
conf.set("hbase.zookeeper.quorum", "ld-m5ef25p66n5es****-proxy-lindorm.lindorm.rds.aliyuncs.com:30020")
Example
The following example demonstrates the full end-to-end workflow: configure the connection, create an HBase table, insert data, define a Spark table with a catalog, and run a COUNT() query.
Step 1: Configure the connection
var conf = HBaseConfiguration.create
conf.set("hbase.zookeeper.quorum", "ld-m5ef25p66n5es****-proxy-lindorm.lindorm.rds.aliyuncs.com:30020")
Step 2: Create an HBase table
val hbaseTableName = "testTable"
val cf = "f"
val column1 = cf + ":a"
val column2 = cf + ":b"
var namespace = "spark_test"
val admin = ConnectionFactory.createConnection(conf).getAdmin()
val tableName = TableName.valueOf(namespace, hbaseTableName)
val htd = new HTableDescriptor(tableName)
htd.addFamily(new HColumnDescriptor(cf))
admin.createTable(htd)
Step 3: Insert data
val k: Array[Byte] = new Array[Byte](3)
val famAndQf = KeyValue.parseColumn(Bytes.toBytes(column1))
val puts = new util.ArrayList[Put]()
var i = 0
for (b1 <- ('a' to 'z')) {
for (b2 <- ('a' to 'z')) {
for (b3 <- ('a' to 'z')) {
if (i < 10) {
k(0) = b1.toByte
k(1) = b2.toByte
k(2) = b3.toByte
val put = new Put(k)
put.addColumn(famAndQf(0), famAndQf(1), ("value_" + b1 + b2 + b3).getBytes())
puts.add(put)
i = i + 1
}
}
}
}
val conn = ConnectionFactory.createConnection(conf)
val table = conn.getTable(tableName)
table.put(puts)
Step 4: Create a Spark table using a catalog
A catalog defines the mapping between your HBase table and a Spark table. It specifies the row key and maps each Spark column to an HBase column family and column qualifier. The catalog is passed as a JSON string in the OPTIONS clause of the CREATE TABLE statement.
val sparkTableName = "spark_hbase"
val createCmd =
s"""CREATE TABLE ${sparkTableName} USING org.apache.hadoop.hbase.spark
|OPTIONS ('catalog'=
|'{"table":{"namespace":"${namespace}", "name":"${hbaseTableName}"},
|"rowkey":"rowkey",
|"columns":{
|"col0":{"cf":"rowkey", "col":"rowkey", "type":"string"},
|"col1":{"cf":"f", "col":"a", "type":"string"},
|"col2":{"cf":"f", "col":"b", "type":"string"}}}')
""".stripMargin
sparkSession.sql(createCmd)
Step 5: Run a Spark SQL query
val result = sparkSession.sql("select count(*) from " + sparkTableName)
val sparkCounts = result.collect().apply(0).getLong(0)
println("Row count: " + sparkCounts)