基於HBase官方提供的Spark Connector,EMR Serverless Spark可以在開發時添加對應的配置來串連HBase。本文為您介紹在EMR Serverless Spark環境中實現HBase的資料讀取和寫入操作。
前提條件
使用限制
僅Serverless Spark以下引擎版本支援本文操作:
esr-4.x:esr-4.1.0及之後版本。
esr-3.x:esr-3.1.0及之後版本。
esr-2.x:esr-2.5.0及之後版本。
操作流程
步驟一:擷取HBase Spark Connector JAR並上傳至OSS
根據HBase Spark Connector官方文檔,結合Spark、Scala、Hadoop以及HBase的版本相容性要求,完成以下步驟以擷取所需的依賴包:
編譯與打包。
根據目標環境的版本資訊(包括Spark、Scala、Hadoop和HBase的版本),您可以對HBase Spark Connector進行編譯,產生以下兩個核心JAR包:
hbase-spark-1.1.0-SNAPSHOT.jarhbase-spark-protocol-shaded-1.1.0-SNAPSHOT.jar例如,使用Maven命令,基於以下版本資訊進行編譯和打包。
mvn -Dspark.version=3.4.2 -Dscala.version=2.12.10 -Dhadoop-three.version=3.2.0 -Dscala.binary.version=2.12 -Dhbase.version=2.4.9 clean package -DskipTests如果您的環境版本與上述版本一致(即Spark 3.4.2、Scala 2.12.10、Hadoop 3.2.0、HBase 2.4.9),可以直接使用已編譯好的JAR包:
擷取HBase相關依賴。 從HBase安裝目錄的
lib/shaded-clients以及lib/client-facing-thirdparty檔案夾提取以下依賴包,其中2.4.9為HBase版本號碼。hbase-shaded-client-2.4.9.jarhbase-shaded-mapreduce-2.4.9.jarslf4j-log4j12-1.7.30.jar
將上述五個JAR上傳至阿里雲OSS中,上傳操作可以參見簡單上傳。
步驟二:建立網路連接
Serverless Spark需要能夠打通與HBase之間的網路才可以正常訪問HBase服務。更多網路連接資訊,請參見EMR Serverless Spark與其他VPC間網路互連。
配置安全性群組規則時,連接埠範圍請根據實際需求選擇性開放必要的連接埠。連接埠範圍的取值為1-65535。本文樣本需開啟ZooKeeper服務連接埠(2181)、HBase Master的連接埠(16000)以及HBase RegionServer的連接埠(16020)。
步驟三:在EMR HBase叢集中建立表
通過SSH方式串連叢集,詳情請參見登入叢集。
執行以下命令,串連HBase。
hbase shell執行以下命令,建立測試表。
create 'hbase_table', 'c1', 'c2'執行以下命令,寫入測試資料。
put 'hbase_table', 'r1', 'c1:name', 'Alice' put 'hbase_table', 'r1', 'c1:age', '25' put 'hbase_table', 'r1', 'c2:city', 'New York' put 'hbase_table', 'r2', 'c1:name', 'Bob' put 'hbase_table', 'r2', 'c1:age', '30' put 'hbase_table', 'r2', 'c2:city', 'San Francisco'
步驟四:Serverless Spark讀取HBase表
建立Notebook會話,詳情請參見管理Notebook會話。
建立會話時,在引擎版本下拉式清單中選擇與HBase Spark Connector版本對應的引擎版本,在網路連接中選擇步驟二中建立好的網路連接,並在Spark配置中添加以下參數來載入HBase Spark Connector。
spark.jars oss://<bucketname>/path/to/hbase-shaded-client-2.4.9.jar,oss://<bucketname>/path/to/hbase-shaded-mapreduce-2.4.9.jar,oss://<bucketname>/path/to/hbase-spark-1.1.0-SNAPSHOT.jar,oss://<bucketname>/path/to/hbase-spark-protocol-shaded-1.1.0-SNAPSHOT.jar,oss://<bucketname>/path/to/slf4j-log4j12-1.7.30.jar spark.hadoop.hbase.zookeeper.quorum Zookeeper內網IP地址 spark.hadoop.hbase.zookeeper.property.clientPort Zookeeper服務連接埠其中,涉及參數資訊說明如下。
參數
描述
樣本
spark.jars外部依賴JAR包的路徑。
上傳至OSS的五個檔案,例如,
oss://<yourBucketname>/spark/hbase/hbase-shaded-client-2.4.9.jar。spark.hadoop.hbase.zookeeper.quorumZookeeper內網IP地址。
如果您使用的是其他HBase叢集,請根據實際情況填寫相應的配置。
如果您使用的是阿里雲的EMR HBase叢集,則可以在EMR HBase叢集的節點管理頁面,查看Master節點的內網IP。
spark.hadoop.hbase.zookeeper.property.clientPortZookeeper服務連接埠。
如果您使用的是其他HBase叢集,請根據實際情況填寫相應的配置。
如果您使用的是阿里雲的EMR HBase叢集,則連接埠為
2181。
在資料開發頁面,選擇建立一個類型的任務,然後在右上方選擇建立的Notebook會話。
更多操作,請參見管理Notebook會話。
拷貝如下代碼到新增的Notebook頁簽中,並根據需要修改相應的參數資訊,然後單擊運行。
# 讀取HBase表。 df = spark.read.format("org.apache.hadoop.hbase.spark") \ .option("hbase.columns.mapping", "id STRING :key, name STRING c1:name, age STRING c1:age, city STRING c2:city") \ .option("hbase.table", "hbase_table") \ .option("hbase.spark.pushdown.columnfilter", False) \ .load() # 註冊臨時視圖。 df.createOrReplaceTempView("hbase_table_view") # 使用SQL查詢資料。 results = spark.sql("SELECT * FROM hbase_table_view") results.show()如果能夠正常返回資料,則表明配置正確。

步驟五:Serverless Spark寫入HBase表
拷貝如下代碼到前一個步驟中新增的Notebook頁簽中,並根據需要修改相應的參數資訊,然後單擊運行。
from pyspark.sql.types import StructType, StructField, StringType
data = [
("r3", "sam", "26", "New York")
]
schema = StructType([
StructField("id", StringType(), True),
StructField("name", StringType(), True),
StructField("age", StringType(), True),
StructField("city", StringType(), True)
])
testDS = spark.createDataFrame(data=data,schema=schema)
testDS.write.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping", "id STRING :key, name STRING c1:name, age STRING c1:age, city STRING c2:city").option("hbase.table", "hbase_table").save()
資料寫入完成後,您可以查表確認資料是否成功寫入。
