全部产品
Search
文档中心

Lindorm:Menggunakan Apache Spark untuk mengakses LindormTable

更新时间:Jun 24, 2025

Topik ini menjelaskan cara menggunakan Apache Spark sumber terbuka untuk mengakses LindormTable.

Prasyarat

  • Versi LindormTable adalah 2.4.3 atau yang lebih baru.

  • Alamat IP klien Anda ditambahkan ke daftar putih instance Lindorm. Untuk informasi lebih lanjut, lihat Konfigurasi Daftar Putih.

  • Titik akhir LindormTable dari API HBase untuk Java diperoleh. Untuk informasi lebih lanjut tentang cara melihat titik akhir, lihat Lihat Titik Akhir.

Catatan penggunaan

  • Untuk mengakses instance Lindorm melalui Internet atau jika instance Lindorm yang ingin diakses adalah instance Lindorm satu node, Anda harus meningkatkan SDK dan mengubah konfigurasi sebelum melakukan operasi yang dijelaskan dalam topik ini. Untuk informasi lebih lanjut, lihat Langkah 1 di Gunakan API ApsaraDB untuk HBase Java untuk Terhubung dan Menggunakan LindormTable.

  • Jika aplikasi Anda diterapkan pada Instance ECS (Elastic Compute Service), pastikan bahwa instance Lindorm dan instance ECS memenuhi persyaratan berikut untuk memastikan konektivitas jaringan:

    • Instance Lindorm dan instance ECS ditempatkan di wilayah yang sama. Kami menyarankan Anda menerapkan kedua instance di zona yang sama untuk mengurangi latensi jaringan.

    • Instance Lindorm dan instance ECS ditempatkan di VPC yang sama.

Tambahkan konfigurasi untuk mengakses LindormTable

  • Metode 1: Tambahkan konfigurasi ke file konfigurasi

    Tambahkan konfigurasi berikut ke file konfigurasi bernama hbase-site.xml:

    <configuration>
          <!--
        Titik akhir LindormTable dari API HBase untuk Java
        -->
        <property>
            <name>hbase.zookeeper.quorum</name>
            <value>ld-m5ef25p66n5es****-proxy-lindorm.lindorm.rds.aliyuncs.com:30020</value>
        </property>
    </configuration>
  • Metode 2: Tambahkan parameter ke objek Configuration

    // Buat objek Configuration.
    Configuration conf = HBaseConfiguration.create();
    // Tentukan titik akhir LindormTable dari API HBase untuk Java.
    conf.set("hbase.zookeeper.quorum", "ld-m5ef25p66n5es****-proxy-lindorm.lindorm.rds.aliyuncs.com:30020");

Contoh

test(" uji hasil perhitungan spark sql") {
  // 1. Tambahkan konfigurasi akses untuk HBaseue Shell.
  var conf = HBaseConfiguration.create
  conf.set("hbase.zookeeper.quorum", "ld-m5ef25p66n5es****-proxy-lindorm.lindorm.rds.aliyuncs.com:30020")
  // 2. Buat tabel ApsaraDB untuk HBase.
  val hbaseTableName = "testTable"
  val cf = "f"
  val column1 = cf + ":a"
  val column2 = cf + ":b"
  var rowsCount: Int = -1
  var namespace = "spark_test"
  val admin = ConnectionFactory.createConnection(conf).getAdmin()
  val tableName = TableName.valueOf(namespace, hbaseTableName)   
  val htd = new HTableDescriptor(tableName)
  htd.addFamily(new HColumnDescriptor(cf))
  admin.createTable(htd)
  // 3. Masukkan data uji ke dalam tabel yang dibuat.
  val rng = new Random()
  val k: Array[Byte] = new Array[Byte](3)
  val famAndQf = KeyValue.parseColumn(Bytes.toBytes(column))
  val puts = new util.ArrayList[Put]()
  var i = 0
  for (b1 <- ('a' to 'z')) {
      for (b2 <- ('a' to 'z')) {
        for (b3 <- ('a' to 'z')) {
          if(i < 10) {
            k(0) = b1.toByte
            k(1) = b2.toByte
            k(2) = b3.toByte
            val put = new Put(k)
            put.addColumn(famAndQf(0), famAndQf(1), ("value_" + b1 + b2 + b3).getBytes())
            puts.add(put)
            i = i + 1
          }
        }
      }
  }
  val conn = ConnectionFactory.createConnection(conf)
  val table = conn.getTable(tableName)
  table.put(puts)
  // 4. Buat tabel Spark.
  val sparkTableName = "spark_hbase"
  val createCmd = s"""CREATE TABLE ${sparkTableName} USING org.apache.hadoop.hbase.spark
                         |    OPTIONS ('catalog'=
                         |    '{"table":{"namespace":"$${hbaseTableName}",                   "name":"${hbaseTableName}"},"rowkey":"rowkey",
                         |    "columns":{
                         |    "col0":{"cf":"rowkey", "col":"rowkey", "type":"string"},
                         |    "col1":{"cf":"cf1", "col":"a", "type":"string"},
                         |    "col2":{"cf":"cf1", "col":"b", "type":"String"}}}'
                         |    )""".stripMargin
  println(" createCmd: \n" + createCmd + " baris : " + rowsCount)
  sparkSession.sql(createCmd)
  // 5. Jalankan pernyataan yang berisi fungsi COUNT().
  val result = sparkSession.sql("select count(*) from " + sparkTableName)
  val sparkCounts = result.collect().apply(0).getLong(0)
  println(" sparkCounts : " + sparkCounts)