本文介紹Spark on MaxCompute訪問ApsaraDB for HBase的配置方法。
背景資訊
Spark on MaxCompute可以訪問位於阿里雲VPC內的執行個體(ECS、HBase、RDS等)。MaxCompute底層網路和外網預設是隔離的,Spark on MaxCompute提供了一種方案通過配置spark.hadoop.odps.cupid.eni.info=regionid:vpc id來訪問阿里雲的VPC網路環境的HBase。HBase標準版和增強版(Lindorm)的配置不同,詳情如下。
前提條件
在實踐之前,您需要提前做好以下準備工作:
已開通MaxCompute服務並建立MaxCompute專案。詳情請參見開通MaxCompute服務和建立MaxCompute專案。
已開通DataWorks服務。詳情請參見DataWorks購買指導。
已開通HBase服務,詳情請參見HBase購買指導。
已開通Virtual Private Cloud,並配置了HBase叢集安全性群組和白名單。詳情請參見網路開通流程。
說明HBase標準版安全性群組開放連接埠為2181、10600、16020。
HBase增強版(Lindorm)版安全性群組開放連接埠為30020、10600、16020。
Spark on MaxCompute訪問阿里雲HBase標準版
在HBase用戶端,執行如下語句建立HBase表。
create 'test','cf'說明更多HBase使用命令,請參見HBase Shell使用介紹。
在IDEA編譯工具編寫Spark代碼邏輯並打包。
使用Scala程式設計語言,按如下程式碼範例編寫Spark代碼邏輯。
object App { def main(args: Array[String]) { val spark = SparkSession .builder() .appName("HbaseTest") .config("spark.sql.catalogImplementation", "odps") .config("spark.hadoop.odps.end.point","http://service.cn.maxcompute.aliyun.com/api") .config("spark.hadoop.odps.runtime.end.point","http://service.cn.maxcompute.aliyun-inc.com/api") .getOrCreate() val sc = spark.sparkContext val config = HBaseConfiguration.create() //HBase叢集的ZooKeeper串連地址。 val zkAddress = "hb-2zecxg2ltnpeg8me4-master*-***:2181,hb-2zecxg2ltnpeg8me4-master*-***:2181,hb-2zecxg2ltnpeg8me4-master*-***:2181" config.set(HConstants.ZOOKEEPER_QUORUM, zkAddress); val jobConf = new JobConf(config) jobConf.setOutputFormat(classOf[TableOutputFormat]) //HBase表名。 jobConf.set(TableOutputFormat.OUTPUT_TABLE,"test") try{ import spark._ //將MaxCompute表中資料寫入HBase表。以下查詢MaxCompute表語句以常量為例,實際開發環境需替換。 spark.sql("select '7', 88 ").rdd.map(row => { val name= row(0).asInstanceOf[String] val id = row(1).asInstanceOf[Integer] val put = new Put(Bytes.toBytes(id)) put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes(id), Bytes.toBytes(name)) (new ImmutableBytesWritable, put) }).saveAsHadoopDataset(jobConf) } finally { sc.stop() } } }說明您可以通過登入HBase控制台,在HBase叢集執行個體詳情頁的資料庫連接頁面擷取ZooKeeper的串連地址。
對應的HBase依賴檔案如下。
<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-mapreduce</artifactId> <version>2.0.2</version> </dependency> <dependency> <groupId>com.aliyun.hbase</groupId> <artifactId>alihbase-client</artifactId> <version>2.0.5</version> </dependency>在IDEA中將代碼以及依賴檔案打成JAR包,並通過MaxCompute用戶端上傳至MaxCompute專案環境中。詳情請參見添加資源。
說明由於DatadWork介面方式上傳JAR包有50 MB的限制,因此採用MaxCompute用戶端上傳JAR包。
在DataWorks上建立ODPS Spark節點並配置。
在DataWorks上,選擇對應的MaxCompute專案環境,將上傳的JAR包添加到資料開發環境中。詳情請參見建立並使用MaxCompute資源。
建立ODPS Spark,並設定任務參數。詳情請參見開發ODPS Spark任務。
提交Spark任務的配置參數如下。
spark.hadoop.odps.cupid.eni.enable = true spark.hadoop.odps.cupid.eni.info=cn-beijing:vpc-2zeaeq21mb1dmkqh0****
Spark on MaxCompute訪問阿里雲HBase增強版(Lindorm)
在HBase用戶端,執行如下語句建立HBase表。
create 'test','cf'說明更多HBase使用命令,請參見HBase Shell使用介紹。
在IDEA編譯工具編寫Spark代碼邏輯並打包。
使用Scala程式設計語言,按照如下樣本編寫Spark代碼邏輯。
object McToHbase { def main(args: Array[String]) { val spark = SparkSession .builder() .appName("spark_sql_ddl") .config("spark.sql.catalogImplementation", "odps") .config("spark.hadoop.odps.end.point","http://service.cn.maxcompute.aliyun.com/api") .config("spark.hadoop.odps.runtime.end.point","http://service.cn.maxcompute.aliyun-inc.com/api") .getOrCreate() val sc = spark.sparkContext try{ //將MaxCompute表中資料寫入HBase表。以下查詢MaxCompute表語句以常量為例,實際開發環境需替換。 spark.sql("select '7', 'long'").rdd.foreachPartition { iter => val config = HBaseConfiguration.create() //ZooKeeper叢集的串連地址(VPC內網地址) config.set("hbase.zookeeper.quorum", "<ZooKeeper串連地址>:30020"); import spark._ //HBase使用者名稱和密碼 config.set("hbase.client.username", "<使用者名稱>"); config.set("hbase.client.password", "<密碼>"); //HBase表名 val tableName = TableName.valueOf( "test") val conn = ConnectionFactory.createConnection(config) val table = conn.getTable(tableName); val puts = new util.ArrayList[Put]() iter.foreach( row => { val id = row(0).asInstanceOf[String] val name = row(1).asInstanceOf[String] val put = new Put(Bytes.toBytes(id)) put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes(id), Bytes.toBytes(name)) puts.add(put) table.put(puts) } ) } } finally { sc.stop() } } }說明登入HBase控制台,在HBase叢集執行個體詳情頁的資料庫連接頁面擷取ZooKeeper的串連地址以及HBase使用者名稱和密碼。
對應的HBase依賴檔案如下。
<dependency> <groupId>com.aliyun.hbase</groupId> <artifactId>alihbase-client</artifactId> <version>2.0.8</version> </dependency>在IDEA中將代碼以及依賴檔案打成JAR包,並通過MaxCompute用戶端上傳至MaxCompute專案環境中。詳情請參見添加資源。
說明由於DatadWork介面方式上傳JAR包有50 MB的限制,因此採用MaxCompute用戶端上傳JAR包。
在DataWorks上建立ODPS Spark節點並配置。
在DataWorks上,選擇對應的MaxCompute專案環境,將上傳的JAR包添加到資料開發環境中。詳情請參見建立並使用MaxCompute資源。
建立ODPS Spark,並設定任務參數。詳情請參見開發ODPS Spark任務。
提交Spark任務的配置參數如下。
spark.hadoop.odps.cupid.eni.enable = true spark.hadoop.odps.cupid.eni.info=cn-beijing:vpc-2zeaeq21mb1dmkqh0****