全部產品
Search
文件中心

MaxCompute:Spark on MaxCompute如何訪問HBase

更新時間:Dec 06, 2025

本文介紹Spark on MaxCompute訪問ApsaraDB for HBase的配置方法。

背景資訊

Spark on MaxCompute可以訪問位於阿里雲VPC內的執行個體(ECS、HBase、RDS等)。MaxCompute底層網路和外網預設是隔離的,Spark on MaxCompute提供了一種方案通過配置spark.hadoop.odps.cupid.eni.info=regionid:vpc id來訪問阿里雲的VPC網路環境的HBase。HBase標準版和增強版(Lindorm)的配置不同,詳情如下。

前提條件

在實踐之前,您需要提前做好以下準備工作:

  • 已開通MaxCompute服務並建立MaxCompute專案。詳情請參見開通MaxCompute服務建立MaxCompute專案

  • 已開通DataWorks服務。詳情請參見DataWorks購買指導

  • 已開通HBase服務,詳情請參見HBase購買指導

  • 已開通Virtual Private Cloud,並配置了HBase叢集安全性群組和白名單。詳情請參見網路開通流程

    說明
    • HBase標準版安全性群組開放連接埠為2181、10600、16020。

    • HBase增強版(Lindorm)版安全性群組開放連接埠為30020、10600、16020。

Spark on MaxCompute訪問阿里雲HBase標準版

  1. 在HBase用戶端,執行如下語句建立HBase表。

    create 'test','cf'
    說明

    更多HBase使用命令,請參見HBase Shell使用介紹

  2. 在IDEA編譯工具編寫Spark代碼邏輯並打包。

    1. 使用Scala程式設計語言,按如下程式碼範例編寫Spark代碼邏輯。

      object App {
        def main(args: Array[String]) {
          val spark = SparkSession
            .builder()
            .appName("HbaseTest")
            .config("spark.sql.catalogImplementation", "odps")
            .config("spark.hadoop.odps.end.point","http://service.cn.maxcompute.aliyun.com/api")
            .config("spark.hadoop.odps.runtime.end.point","http://service.cn.maxcompute.aliyun-inc.com/api")
            .getOrCreate()
      
          val sc = spark.sparkContext
          val config = HBaseConfiguration.create()
          //HBase叢集的ZooKeeper串連地址。
          val zkAddress = "hb-2zecxg2ltnpeg8me4-master*-***:2181,hb-2zecxg2ltnpeg8me4-master*-***:2181,hb-2zecxg2ltnpeg8me4-master*-***:2181"
          config.set(HConstants.ZOOKEEPER_QUORUM, zkAddress);
          val jobConf = new JobConf(config)
          jobConf.setOutputFormat(classOf[TableOutputFormat])
          //HBase表名。
          jobConf.set(TableOutputFormat.OUTPUT_TABLE,"test")
      
          try{
            import spark._
            //將MaxCompute表中資料寫入HBase表。以下查詢MaxCompute表語句以常量為例,實際開發環境需替換。
            spark.sql("select '7', 88 ").rdd.map(row => {
              val name= row(0).asInstanceOf[String]
              val id = row(1).asInstanceOf[Integer]
              val put = new Put(Bytes.toBytes(id))
              put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes(id), Bytes.toBytes(name))
              (new ImmutableBytesWritable, put)
            }).saveAsHadoopDataset(jobConf)
          } finally {
            sc.stop()
          }
        }
      }
      說明

      您可以通過登入HBase控制台,在HBase叢集執行個體詳情頁的資料庫連接頁面擷取ZooKeeper的串連地址。

      對應的HBase依賴檔案如下。

      <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-mapreduce</artifactId>
            <version>2.0.2</version>
          </dependency>
           <dependency>
            <groupId>com.aliyun.hbase</groupId>
            <artifactId>alihbase-client</artifactId>
            <version>2.0.5</version>
          </dependency>
    2. 在IDEA中將代碼以及依賴檔案打成JAR包,並通過MaxCompute用戶端上傳至MaxCompute專案環境中。詳情請參見添加資源

      說明

      由於DatadWork介面方式上傳JAR包有50 MB的限制,因此採用MaxCompute用戶端上傳JAR包。

  3. 在DataWorks上建立ODPS Spark節點並配置。

    1. 在DataWorks上,選擇對應的MaxCompute專案環境,將上傳的JAR包添加到資料開發環境中。詳情請參見建立並使用MaxCompute資源

    2. 建立ODPS Spark,並設定任務參數。詳情請參見開發ODPS Spark任務

      提交Spark任務的配置參數如下。

      spark.hadoop.odps.cupid.eni.enable = true
      spark.hadoop.odps.cupid.eni.info=cn-beijing:vpc-2zeaeq21mb1dmkqh0****

Spark on MaxCompute訪問阿里雲HBase增強版(Lindorm)

  1. 在HBase用戶端,執行如下語句建立HBase表。

    create 'test','cf'
    說明

    更多HBase使用命令,請參見HBase Shell使用介紹

  2. 在IDEA編譯工具編寫Spark代碼邏輯並打包。

    1. 使用Scala程式設計語言,按照如下樣本編寫Spark代碼邏輯。

      object McToHbase {
        def main(args: Array[String]) {
          val spark = SparkSession
            .builder()
            .appName("spark_sql_ddl")
            .config("spark.sql.catalogImplementation", "odps")
            .config("spark.hadoop.odps.end.point","http://service.cn.maxcompute.aliyun.com/api")
            .config("spark.hadoop.odps.runtime.end.point","http://service.cn.maxcompute.aliyun-inc.com/api")
            .getOrCreate()
      
            val sc = spark.sparkContext
      
          try{
            //將MaxCompute表中資料寫入HBase表。以下查詢MaxCompute表語句以常量為例,實際開發環境需替換。
            spark.sql("select '7', 'long'").rdd.foreachPartition { iter =>
              val config = HBaseConfiguration.create()
              //ZooKeeper叢集的串連地址(VPC內網地址)
              config.set("hbase.zookeeper.quorum", "<ZooKeeper串連地址>:30020");
              import spark._
              //HBase使用者名稱和密碼
              config.set("hbase.client.username", "<使用者名稱>");
              config.set("hbase.client.password", "<密碼>");
              //HBase表名
              val tableName = TableName.valueOf( "test")
              val conn = ConnectionFactory.createConnection(config)
              val table = conn.getTable(tableName);
              val puts = new util.ArrayList[Put]()
              iter.foreach(
                row => {
                  val id = row(0).asInstanceOf[String]
                  val name = row(1).asInstanceOf[String]
                  val put = new Put(Bytes.toBytes(id))
                  put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes(id), Bytes.toBytes(name))
                  puts.add(put)
                  table.put(puts)
                }
              )
            }
        } finally {
          sc.stop()
        }
        }
      }
      說明

      登入HBase控制台,在HBase叢集執行個體詳情頁的資料庫連接頁面擷取ZooKeeper的串連地址以及HBase使用者名稱和密碼。

      對應的HBase依賴檔案如下。

      <dependency>
            <groupId>com.aliyun.hbase</groupId>
            <artifactId>alihbase-client</artifactId>
            <version>2.0.8</version>
          </dependency>
    2. 在IDEA中將代碼以及依賴檔案打成JAR包,並通過MaxCompute用戶端上傳至MaxCompute專案環境中。詳情請參見添加資源

      說明

      由於DatadWork介面方式上傳JAR包有50 MB的限制,因此採用MaxCompute用戶端上傳JAR包。

  3. 在DataWorks上建立ODPS Spark節點並配置。

    1. 在DataWorks上,選擇對應的MaxCompute專案環境,將上傳的JAR包添加到資料開發環境中。詳情請參見建立並使用MaxCompute資源

    2. 建立ODPS Spark,並設定任務參數。詳情請參見開發ODPS Spark任務

      提交Spark任務的配置參數如下。

      spark.hadoop.odps.cupid.eni.enable = true
      spark.hadoop.odps.cupid.eni.info=cn-beijing:vpc-2zeaeq21mb1dmkqh0****