すべてのプロダクト
Search
ドキュメントセンター

Lindorm:Spark を使用した LindormTable へのアクセス

最終更新日:Mar 28, 2026

Apache Spark は、HBase コネクタを介して LindormTable に格納されたデータをクエリおよび分析できます。本ガイドでは、接続の構成、HBase テーブルスキーマから Spark テーブルへのマッピング、および Spark SQL クエリの実行手順について説明します。

前提条件

開始する前に、以下の条件を満たしていることを確認してください。

  • LindormTable バージョン 2.4.3 以降

  • ご利用のクライアント IP アドレスが Lindorm インスタンスのホワイトリストに登録されていること。詳細については、「ホワイトリストの設定」をご参照ください。

  • Java 向け HBase API の LindormTable エンドポイント。詳細については、「エンドポイントの表示」をご参照ください。

注意事項

  • インターネット経由で Lindorm インスタンスにアクセスする場合、または Lindorm インスタンスがシングルノード構成の場合には、SDK をスペックアップし、構成を更新してから作業を進めてください。詳細については、「ApsaraDB for HBase API for Java を使用した LindormTable への接続と利用」のステップ 1 をご参照ください。

  • アプリケーションが Elastic Compute Service (ECS) インスタンス上で実行される場合は、以下の点を確認してください。

    • Lindorm インスタンスと ECS インスタンスが同一リージョンに配置されていること。ネットワーク遅延を低減するため、同一ゾーンへの配置を推奨します。

    • Lindorm インスタンスと ECS インスタンスが同一 VPC 内にあること。

全体の操作手順

Spark から LindormTable へ接続する全体の手順は以下のとおりです。

  1. LindormTable 接続エンドポイントの構成

  2. HBase テーブルの作成およびデータの挿入

  3. カタログの定義(HBase カラムを Spark テーブルにマップ)

  4. カタログを使用した Spark テーブルの作成

  5. Spark テーブルに対する Spark SQL クエリの実行

接続の構成

hbase.zookeeper.quorum を、Java 向け HBase API の LindormTable エンドポイントに設定します。以下のいずれかの方法を使用できます。

方法 1:構成ファイルを使用

hbase-site.xml に以下を追加します。

<configuration>
    <!-- Java 向け HBase API の LindormTable エンドポイント -->
    <property>
        <name>hbase.zookeeper.quorum</name>
        <value>ld-m5ef25p66n5es****-proxy-lindorm.lindorm.rds.aliyuncs.com:30020</value>
    </property>
</configuration>

方法 2:構成オブジェクトを使用

// 構成オブジェクトを作成します。
val conf = HBaseConfiguration.create()
// Java 向け HBase API の LindormTable エンドポイントを指定します。
conf.set("hbase.zookeeper.quorum", "ld-m5ef25p66n5es****-proxy-lindorm.lindorm.rds.aliyuncs.com:30020")

サンプル

以下のサンプルでは、接続の構成、HBase テーブルの作成、データの挿入、カタログによる Spark テーブルの定義、および COUNT() クエリの実行までを含む、エンドツーエンドのワークフローを示します。

ステップ 1:接続の構成

var conf = HBaseConfiguration.create
conf.set("hbase.zookeeper.quorum", "ld-m5ef25p66n5es****-proxy-lindorm.lindorm.rds.aliyuncs.com:30020")

ステップ 2:HBase テーブルの作成

val hbaseTableName = "testTable"
val cf = "f"
val column1 = cf + ":a"
val column2 = cf + ":b"
var namespace = "spark_test"

val admin = ConnectionFactory.createConnection(conf).getAdmin()
val tableName = TableName.valueOf(namespace, hbaseTableName)
val htd = new HTableDescriptor(tableName)
htd.addFamily(new HColumnDescriptor(cf))
admin.createTable(htd)

ステップ 3:データの挿入

val k: Array[Byte] = new Array[Byte](3)
val famAndQf = KeyValue.parseColumn(Bytes.toBytes(column1))
val puts = new util.ArrayList[Put]()
var i = 0

for (b1 <- ('a' to 'z')) {
  for (b2 <- ('a' to 'z')) {
    for (b3 <- ('a' to 'z')) {
      if (i < 10) {
        k(0) = b1.toByte
        k(1) = b2.toByte
        k(2) = b3.toByte
        val put = new Put(k)
        put.addColumn(famAndQf(0), famAndQf(1), ("value_" + b1 + b2 + b3).getBytes())
        puts.add(put)
        i = i + 1
      }
    }
  }
}

val conn = ConnectionFactory.createConnection(conf)
val table = conn.getTable(tableName)
table.put(puts)

ステップ 4:カタログを使用した Spark テーブルの作成

カタログは、HBase テーブルと Spark テーブル間のマッピングを定義します。これにより、行キー(rowkey)が指定され、各 Spark カラムが HBase のカラムファミリーおよびカラム修飾子(column qualifier)にマップされます。カタログは、OPTIONS 句内で JSON 文字列として CREATE TABLE 文に渡されます。

val sparkTableName = "spark_hbase"

val createCmd =
  s"""CREATE TABLE ${sparkTableName} USING org.apache.hadoop.hbase.spark
     |OPTIONS ('catalog'=
     |'{"table":{"namespace":"${namespace}", "name":"${hbaseTableName}"},
     |"rowkey":"rowkey",
     |"columns":{
     |"col0":{"cf":"rowkey", "col":"rowkey", "type":"string"},
     |"col1":{"cf":"f", "col":"a", "type":"string"},
     |"col2":{"cf":"f", "col":"b", "type":"string"}}}')
  """.stripMargin

sparkSession.sql(createCmd)

ステップ 5:Spark SQL クエリの実行

val result = sparkSession.sql("select count(*) from " + sparkTableName)
val sparkCounts = result.collect().apply(0).getLong(0)
println("行数: " + sparkCounts)

次のステップ