Apache Spark は、HBase コネクタを介して LindormTable に格納されたデータをクエリおよび分析できます。本ガイドでは、接続の構成、HBase テーブルスキーマから Spark テーブルへのマッピング、および Spark SQL クエリの実行手順について説明します。
前提条件
開始する前に、以下の条件を満たしていることを確認してください。
LindormTable バージョン 2.4.3 以降
ご利用のクライアント IP アドレスが Lindorm インスタンスのホワイトリストに登録されていること。詳細については、「ホワイトリストの設定」をご参照ください。
Java 向け HBase API の LindormTable エンドポイント。詳細については、「エンドポイントの表示」をご参照ください。
注意事項
インターネット経由で Lindorm インスタンスにアクセスする場合、または Lindorm インスタンスがシングルノード構成の場合には、SDK をスペックアップし、構成を更新してから作業を進めてください。詳細については、「ApsaraDB for HBase API for Java を使用した LindormTable への接続と利用」のステップ 1 をご参照ください。
アプリケーションが Elastic Compute Service (ECS) インスタンス上で実行される場合は、以下の点を確認してください。
Lindorm インスタンスと ECS インスタンスが同一リージョンに配置されていること。ネットワーク遅延を低減するため、同一ゾーンへの配置を推奨します。
Lindorm インスタンスと ECS インスタンスが同一 VPC 内にあること。
全体の操作手順
Spark から LindormTable へ接続する全体の手順は以下のとおりです。
LindormTable 接続エンドポイントの構成
HBase テーブルの作成およびデータの挿入
カタログの定義(HBase カラムを Spark テーブルにマップ)
カタログを使用した Spark テーブルの作成
Spark テーブルに対する Spark SQL クエリの実行
接続の構成
hbase.zookeeper.quorum を、Java 向け HBase API の LindormTable エンドポイントに設定します。以下のいずれかの方法を使用できます。
方法 1:構成ファイルを使用
hbase-site.xml に以下を追加します。
<configuration>
<!-- Java 向け HBase API の LindormTable エンドポイント -->
<property>
<name>hbase.zookeeper.quorum</name>
<value>ld-m5ef25p66n5es****-proxy-lindorm.lindorm.rds.aliyuncs.com:30020</value>
</property>
</configuration>方法 2:構成オブジェクトを使用
// 構成オブジェクトを作成します。
val conf = HBaseConfiguration.create()
// Java 向け HBase API の LindormTable エンドポイントを指定します。
conf.set("hbase.zookeeper.quorum", "ld-m5ef25p66n5es****-proxy-lindorm.lindorm.rds.aliyuncs.com:30020")サンプル
以下のサンプルでは、接続の構成、HBase テーブルの作成、データの挿入、カタログによる Spark テーブルの定義、および COUNT() クエリの実行までを含む、エンドツーエンドのワークフローを示します。
ステップ 1:接続の構成
var conf = HBaseConfiguration.create
conf.set("hbase.zookeeper.quorum", "ld-m5ef25p66n5es****-proxy-lindorm.lindorm.rds.aliyuncs.com:30020")ステップ 2:HBase テーブルの作成
val hbaseTableName = "testTable"
val cf = "f"
val column1 = cf + ":a"
val column2 = cf + ":b"
var namespace = "spark_test"
val admin = ConnectionFactory.createConnection(conf).getAdmin()
val tableName = TableName.valueOf(namespace, hbaseTableName)
val htd = new HTableDescriptor(tableName)
htd.addFamily(new HColumnDescriptor(cf))
admin.createTable(htd)ステップ 3:データの挿入
val k: Array[Byte] = new Array[Byte](3)
val famAndQf = KeyValue.parseColumn(Bytes.toBytes(column1))
val puts = new util.ArrayList[Put]()
var i = 0
for (b1 <- ('a' to 'z')) {
for (b2 <- ('a' to 'z')) {
for (b3 <- ('a' to 'z')) {
if (i < 10) {
k(0) = b1.toByte
k(1) = b2.toByte
k(2) = b3.toByte
val put = new Put(k)
put.addColumn(famAndQf(0), famAndQf(1), ("value_" + b1 + b2 + b3).getBytes())
puts.add(put)
i = i + 1
}
}
}
}
val conn = ConnectionFactory.createConnection(conf)
val table = conn.getTable(tableName)
table.put(puts)ステップ 4:カタログを使用した Spark テーブルの作成
カタログは、HBase テーブルと Spark テーブル間のマッピングを定義します。これにより、行キー(rowkey)が指定され、各 Spark カラムが HBase のカラムファミリーおよびカラム修飾子(column qualifier)にマップされます。カタログは、OPTIONS 句内で JSON 文字列として CREATE TABLE 文に渡されます。
val sparkTableName = "spark_hbase"
val createCmd =
s"""CREATE TABLE ${sparkTableName} USING org.apache.hadoop.hbase.spark
|OPTIONS ('catalog'=
|'{"table":{"namespace":"${namespace}", "name":"${hbaseTableName}"},
|"rowkey":"rowkey",
|"columns":{
|"col0":{"cf":"rowkey", "col":"rowkey", "type":"string"},
|"col1":{"cf":"f", "col":"a", "type":"string"},
|"col2":{"cf":"f", "col":"b", "type":"string"}}}')
""".stripMargin
sparkSession.sql(createCmd)ステップ 5:Spark SQL クエリの実行
val result = sparkSession.sql("select count(*) from " + sparkTableName)
val sparkCounts = result.collect().apply(0).getLong(0)
println("行数: " + sparkCounts)