このトピックでは、オープンソースの Apache Spark を使用して LindormTable にアクセスする方法について説明します。
前提条件
LindormTable のバージョンは 2.4.3 以降です。
クライアントの IP アドレスが Lindorm インスタンスのホワイトリストに追加されています。詳細については、「ホワイトリストの設定」をご参照ください。
Java 用 HBase API の LindormTable エンドポイントが取得されています。エンドポイントの表示方法の詳細については、「エンドポイントの表示」をご参照ください。
使用上の注意
インターネット経由で Lindorm インスタンスにアクセスする場合、またはアクセスする Lindorm インスタンスがシングルノード Lindorm インスタンスである場合は、このトピックで説明されている操作を実行する前に、SDK をアップグレードし、設定を変更する必要があります。詳細については、「Java 用 ApsaraDB for HBase API を使用して LindormTable に接続して使用する」のステップ 1 をご参照ください。
アプリケーションが Elastic Compute Service (ECS) インスタンスにデプロイされている場合は、ネットワーク接続を確保するために、Lindorm インスタンスと ECS インスタンスが事前に次の要件を満たしていることを確認してください。
Lindorm インスタンスと ECS インスタンスは同じリージョンにデプロイされています。ネットワークレイテンシを削減するために、2 つのインスタンスを同じゾーンにデプロイすることをお勧めします。
Lindorm インスタンスと ECS インスタンスは同じ VPC にデプロイされています。
LindormTable にアクセスするための設定の追加
方法 1: 設定ファイルへの設定の追加
hbase-site.xmlという名前の設定ファイルに次の設定を追加します。<configuration> <!-- Java 用 HBase API の LindormTable エンドポイント --> <property> <name>hbase.zookeeper.quorum</name> <value>ld-m5ef25p66n5es****-proxy-lindorm.lindorm.rds.aliyuncs.com:30020</value> </property> </configuration>方法 2: Configuration オブジェクトへのパラメーターの追加
// Configuration オブジェクトを作成します。 Configuration conf = HBaseConfiguration.create(); // Java 用 HBase API の LindormTable エンドポイントを指定します。 conf.set("hbase.zookeeper.quorum", "ld-m5ef25p66n5es****-proxy-lindorm.lindorm.rds.aliyuncs.com:30020");
例
test("Spark SQL の count 結果をテスト") {
// 1. HBaseue Shell のアクセス設定を追加します。
var conf = HBaseConfiguration.create
conf.set("hbase.zookeeper.quorum", "ld-m5ef25p66n5es****-proxy-lindorm.lindorm.rds.aliyuncs.com:30020")
// 2. ApsaraDB for HBase テーブルを作成します。
val hbaseTableName = "testTable"
val cf = "f"
val column1 = cf + ":a"
val column2 = cf + ":b"
var rowsCount: Int = -1
var namespace = "spark_test"
val admin = ConnectionFactory.createConnection(conf).getAdmin()
val tableName = TableName.valueOf(namespace, hbaseTableName)
val htd = new HTableDescriptor(tableName)
htd.addFamily(new HColumnDescriptor(cf))
admin.createTable(htd)
// 3. 作成したテーブルにテストデータを挿入します。
val rng = new Random()
val k: Array[Byte] = new Array[Byte](3)
val famAndQf = KeyValue.parseColumn(Bytes.toBytes(column))
val puts = new util.ArrayList[Put]()
var i = 0
for (b1 <- ('a' to 'z')) {
for (b2 <- ('a' to 'z')) {
for (b3 <- ('a' to 'z')) {
if(i < 10) {
k(0) = b1.toByte
k(1) = b2.toByte
k(2) = b3.toByte
val put = new Put(k)
put.addColumn(famAndQf(0), famAndQf(1), ("value_" + b1 + b2 + b3).getBytes())
puts.add(put)
i = i + 1
}
}
}
}
val conn = ConnectionFactory.createConnection(conf)
val table = conn.getTable(tableName)
table.put(puts)
// 4. Spark テーブルを作成します。
val sparkTableName = "spark_hbase"
val createCmd = s"""CREATE TABLE ${sparkTableName} USING org.apache.hadoop.hbase.spark
| OPTIONS ('catalog'=
| '{"table":{"namespace":"$${hbaseTableName}", "name":"${hbaseTableName}"},"rowkey":"rowkey",
| "columns":{
| "col0":{"cf":"rowkey", "col":"rowkey", "type":"string"},
| "col1":{"cf":"cf1", "col":"a", "type":"string"},
| "col2":{"cf":"cf1", "col":"b", "type":"String"}}}'
| )""".stripMargin
println(" createCmd: \n" + createCmd + " rows : " + rowsCount)
sparkSession.sql(createCmd)
// 5. COUNT() 関数を含むステートメントを実行します。
val result = sparkSession.sql("select count(*) from " + sparkTableName)
val sparkCounts = result.collect().apply(0).getLong(0)
println(" sparkCounts : " + sparkCounts)