基於Apache Doris官方提供的Spark Connector,EMR Serverless Spark可以在開發時添加對應的配置來串連Doris。本文為您介紹在EMR Serverless Spark環境中實現Doris的資料讀取和寫入操作。
背景資訊
Apache Doris是一個高效能、即時的AnalyticDB,能夠較好地滿足報表分析、即席查詢、資料湖聯邦查詢加速等使用情境。更多資訊,請參見Introduction to Apache Doris。
EMR Serverless Spark是一款相容開源Spark的高效能Lakehouse產品,提供了企業級全託管的資料平台服務。通過結合Apache Doris與EMR Serverless Spark,您可以高效地進行資料讀取、寫入和分析操作,從而實現端到端的資料處理流程。
前提條件
使用限制
Serverless Spark引擎的版本要求為esr-2.6.0、esr-3.2.0、esr-4.2.0及以上版本。
操作流程
步驟一:擷取Doris Spark Connector JAR並上傳至OSS
您需要查閱Doris的官方文檔Spark Doris Connector。該文檔通常會列出不同版本的連接器與不同版本的 Spark 引擎的相容情況。您需要確認您正在使用的 Spark 版本與 Doris Spark Connector 版本之間的相容性。
步驟二:建立網路連接
Serverless Spark需要能夠打通與EMR Doris叢集之間的網路才可以正常訪問Doris服務。更多網路連接資訊,請參見EMR Serverless Spark與其他VPC間網路互連。
配置安全性群組規則時,連接埠範圍請根據實際需求選擇性開放必要的連接埠。連接埠範圍的取值為1~65535。本文樣本需開啟HTTP 連接埠(8031)、RPC 連接埠(9061)以及Webserver連接埠(8041)。
步驟三:在EMR Doris叢集中建立庫表
使用SSH方式登入叢集,詳情請參見登入叢集。
執行以下命令,串連EMR Doris叢集。
mysql -h127.0.0.1 -P 9031 -uroot建立資料庫和表。
CREATE DATABASE IF NOT EXISTS testdb; USE testdb; CREATE TABLE test ( id INT, name STRING ) PROPERTIES("replication_num" = "1");插入測試資料。
INSERT INTO test VALUES (1, 'a'), (2, 'b'), (3, 'c');查詢資料。
SELECT * FROM test;返回資訊如下圖所示。

步驟四:Serverless Spark讀取Doris表
使用SQL會話讀Doris表
建立SQL會話,詳情請參見管理SQL會話。
建立會話時,在引擎版本下拉式清單中選擇與Doris Spark Connector版本對應的引擎版本,在網路連接中選擇步驟二中建立好的網路連接,並在Spark配置中添加以下參數來載入Doris Spark Connector。
spark.emr.serverless.user.defined.jars oss://<bucketname>/path/connector.jar其中,
oss://<bucketname>/path/connector.jar為您步驟一中上傳至OSS的Doris Spark Connector的路徑。例如,oss://emr-oss/spark/spark-doris-connector-spark-3.4-24.0.0.jar。在資料開發頁面,選擇建立一個SparkSQL類型的任務,然後在右上方選擇建立好的SQL會話。
更多操作,請參見SparkSQL開發。
拷貝如下代碼到新增的SparkSQL頁簽中,並根據需要修改相應的參數資訊,然後單擊運行。
CREATE TEMPORARY VIEW test USING doris OPTIONS( "table.identifier" = "testdb.test", "fenodes" = "<doris_address>:<http_port>", "user" = "<user>", "password" = "<password>" ); SELECT * FROM test;其中,涉及參數資訊說明如下。
參數
描述
樣本
testdb.testDoris服務中實際的資料庫和表名。
如果您使用的是其他Doris叢集,請根據實際情況填寫相應的配置。
如果您使用的是EMR on ECS中建立的叢集,則填寫如下參數值:
testdb.test:本文以testdb.test為例。<doris_address>:您可以在EMR on ECS控制台Doris叢集的節點管理頁面,單擊emr-master前的表徵圖,查看內網IP地址。<http_port>:預設為8031。<user>:預設使用者名為root。<password>:預設密碼為空白。
<doris_address>Doris服務所在的節點內網IP地址。
<http_port>Doris服務監聽HTTP請求的連接埠號碼。
<user>用於串連Doris服務的使用者名稱。
<password>用於串連Doris服務的使用者密碼。
如果能夠正常返回資料,則表明配置正確。

使用Notebook會話讀Doris表
建立Notebook會話,詳情請參見管理Notebook會話。
建立會話時,在引擎版本下拉式清單中選擇與Doris Spark Connector版本對應的引擎版本,在網路連接中選擇步驟二中建立好的網路連接,並在Spark配置中添加以下參數來載入Doris Spark Connector。
spark.emr.serverless.user.defined.jars oss://<bucketname>/path/connector.jar其中,
oss://<bucketname>/path/connector.jar為您步驟一中上傳至OSS的Doris Spark Connector的路徑。例如,oss://emr-oss/spark/spark-doris-connector-spark-3.4-24.0.0.jar。在資料開發頁面,選擇建立一個類型的任務,然後在右上方選擇建立的Notebook會話。
更多操作,請參見管理Notebook會話。
拷貝如下代碼到新增的Notebook頁簽中,並根據需要修改相應的參數資訊,然後單擊運行。
dorisSparkDF = spark.read.format("doris") \ .option("doris.table.identifier", "testdb.test") \ .option("doris.fenodes", "<doris_address>:<http_port>") \ .option("user", "<user>") \ .option("password", "<password>") \ .load() dorisSparkDF.show(3)其中,涉及參數資訊說明如下。
參數
描述
樣本
testdb.testDoris服務中實際的資料庫和表名。
如果您使用的是其他Doris叢集,請根據實際情況填寫相應的配置。
如果您使用的是EMR on ECS中建立的叢集,則填寫如下參數值:
testdb.test:本文以testdb.test為例。<doris_address>:您可以在EMR on ECS控制台Doris叢集的節點管理頁面,單擊emr-master前的表徵圖,查看內網IP地址。<http_port>:預設為8031。<user>:預設使用者名為root。<password>:預設密碼為空白。
<doris_address>Doris服務所在的節點內網IP地址。
<http_port>Doris服務監聽HTTP請求的連接埠號碼。
<user>用於串連Doris服務的使用者名稱。
<password>用於串連Doris服務的使用者密碼。
如果能夠正常返回資料,則表明配置正確。

步驟五:Serverless Spark寫入Doris表
使用SQL會話寫Doris表
拷貝如下代碼到前一個步驟中新增的SparkSQL頁簽中,並根據需要修改相應的參數資訊,然後單擊運行。
CREATE TEMPORARY VIEW test_write
USING doris
OPTIONS(
"table.identifier" = "testdb.test",
"fenodes" = "<doris_address>:<http_port>",
"user" = "<user>",
"password" = "<password>"
);
INSERT INTO test_write VALUES (4, 'd'), (5, 'e');
SELECT * FROM test_write;如果能夠返回以下資料,則表明資料寫入成功。

使用Notebook會話寫Doris表
拷貝如下代碼到前一個步驟中新增的Notebook頁簽中,並根據需要修改相應的參數資訊,然後單擊運行。
data = [(7, 'f'), (8, 'g')]
mockDataDF = spark.createDataFrame(data, ["id", "name"])
mockDataDF.write.mode("append").format("doris") \
.option("doris.table.identifier", "testdb.test") \
.option("doris.fenodes", "<doris_address>:<http_port>") \
.option("user", "<user>") \
.option("password", "<password>") \
.save()
dorisSparkDF = spark.read.format("doris") \
.option("doris.table.identifier", "testdb.test") \
.option("doris.fenodes", "<doris_address>:<http_port>") \
.option("user", "<user>") \
.option("password", "<password>") \
.load()
dorisSparkDF.show(10)如果能夠返回以下資料,則表明資料寫入成功。
