全部產品
Search
文件中心

E-MapReduce:讀寫Doris

更新時間:Sep 19, 2025

基於Apache Doris官方提供的Spark Connector,EMR Serverless Spark可以在開發時添加對應的配置來串連Doris。本文為您介紹在EMR Serverless Spark環境中實現Doris的資料讀取和寫入操作。

背景資訊

Apache Doris是一個高效能、即時的AnalyticDB,能夠較好地滿足報表分析、即席查詢、資料湖聯邦查詢加速等使用情境。更多資訊,請參見Introduction to Apache Doris

EMR Serverless Spark是一款相容開源Spark的高效能Lakehouse產品,提供了企業級全託管的資料平台服務。通過結合Apache Doris與EMR Serverless Spark,您可以高效地進行資料讀取、寫入和分析操作,從而實現端到端的資料處理流程。

前提條件

  • 已建立Serverless Spark工作空間,詳情請參見建立工作空間

  • 已建立Doris叢集。

    如果是在EMR on ECS建立包含Doris服務的資料分析(OLAP)叢集,詳情請參見建立叢集。本文以在EMR on ECS建立包含Doris服務的叢集為例,後續簡稱EMR Doris叢集。

使用限制

Serverless Spark引擎的版本要求為esr-2.6.0、esr-3.2.0、esr-4.2.0及以上版本。

操作流程

步驟一:擷取Doris Spark Connector JAR並上傳至OSS

您需要查閱Doris的官方文檔Spark Doris Connector。該文檔通常會列出不同版本的連接器與不同版本的 Spark 引擎的相容情況。您需要確認您正在使用的 Spark 版本與 Doris Spark Connector 版本之間的相容性。

  1. 訪問Doris Spark Connector的GitHub倉庫,選擇合適的版本進行下載。

    Doris Spark Connector JAR包的命名格式為spark-doris-connector-spark-${spark_version}-${connector_version}.jar。例如,您使用的引擎版本為esr-3.1.0 (Spark 3.4.3, Scala 2.12),則可以下載spark-doris-connector-spark-3.4-24.0.0.jar

  2. 將下載的Spark Connector JAR上傳至阿里雲OSS中,上傳操作可以參見簡單上傳

步驟二:建立網路連接

Serverless Spark需要能夠打通與EMR Doris叢集之間的網路才可以正常訪問Doris服務。更多網路連接資訊,請參見EMR Serverless Spark與其他VPC間網路互連

重要

配置安全性群組規則時,連接埠範圍請根據實際需求選擇性開放必要的連接埠。連接埠範圍的取值為1~65535。本文樣本需開啟HTTP 連接埠(8031)、RPC 連接埠(9061)以及Webserver連接埠(8041)。

步驟三:在EMR Doris叢集中建立庫表

  1. 使用SSH方式登入叢集,詳情請參見登入叢集

  2. 執行以下命令,串連EMR Doris叢集。

    mysql -h127.0.0.1  -P 9031 -uroot
  3. 建立資料庫和表。

    CREATE DATABASE IF NOT EXISTS testdb;
    
    USE testdb;
    
    CREATE TABLE test (
        id INT, 
        name STRING
    ) PROPERTIES("replication_num" = "1");
    
  4. 插入測試資料。

    INSERT INTO test VALUES (1, 'a'), (2, 'b'), (3, 'c');
  5. 查詢資料。

    SELECT * FROM test;

    返回資訊如下圖所示。

    image

步驟四:Serverless Spark讀取Doris表

使用SQL會話讀Doris表

  1. 建立SQL會話,詳情請參見管理SQL會話

    建立會話時,在引擎版本下拉式清單中選擇與Doris Spark Connector版本對應的引擎版本,在網路連接中選擇步驟二中建立好的網路連接,並在Spark配置中添加以下參數來載入Doris Spark Connector。

    spark.emr.serverless.user.defined.jars  oss://<bucketname>/path/connector.jar

    其中,oss://<bucketname>/path/connector.jar為您步驟一中上傳至OSS的Doris Spark Connector的路徑。例如,oss://emr-oss/spark/spark-doris-connector-spark-3.4-24.0.0.jar

  2. 資料開發頁面,選擇建立一個SparkSQL類型的任務,然後在右上方選擇建立好的SQL會話。

    更多操作,請參見SparkSQL開發

  3. 拷貝如下代碼到新增的SparkSQL頁簽中,並根據需要修改相應的參數資訊,然後單擊運行

    CREATE TEMPORARY VIEW test
    USING doris
    OPTIONS(
      "table.identifier" = "testdb.test",
      "fenodes" = "<doris_address>:<http_port>",
      "user" = "<user>",
      "password" = "<password>"
    );
    
    SELECT * FROM test;

    其中,涉及參數資訊說明如下。

    參數

    描述

    樣本

    testdb.test

    Doris服務中實際的資料庫和表名。

    • 如果您使用的是其他Doris叢集,請根據實際情況填寫相應的配置。

    • 如果您使用的是EMR on ECS中建立的叢集,則填寫如下參數值:

      • testdb.test:本文以testdb.test為例。

      • <doris_address>:您可以在EMR on ECS控制台Doris叢集的節點管理頁面,單擊emr-master前的表徵圖,查看內網IP地址。

      • <http_port>:預設為8031。

      • <user>:預設使用者名為root

      • <password>:預設密碼為空白。

    <doris_address>

    Doris服務所在的節點內網IP地址。

    <http_port>

    Doris服務監聽HTTP請求的連接埠號碼。

    <user>

    用於串連Doris服務的使用者名稱。

    <password>

    用於串連Doris服務的使用者密碼。

    如果能夠正常返回資料,則表明配置正確。

    image

使用Notebook會話讀Doris表

  1. 建立Notebook會話,詳情請參見管理Notebook會話

    建立會話時,在引擎版本下拉式清單中選擇與Doris Spark Connector版本對應的引擎版本,在網路連接中選擇步驟二中建立好的網路連接,並在Spark配置中添加以下參數來載入Doris Spark Connector。

    spark.emr.serverless.user.defined.jars  oss://<bucketname>/path/connector.jar

    其中,oss://<bucketname>/path/connector.jar為您步驟一中上傳至OSS的Doris Spark Connector的路徑。例如,oss://emr-oss/spark/spark-doris-connector-spark-3.4-24.0.0.jar

  2. 資料開發頁面,選擇建立一個互動式開發 > Notebook類型的任務,然後在右上方選擇建立的Notebook會話。

    更多操作,請參見管理Notebook會話

  3. 拷貝如下代碼到新增的Notebook頁簽中,並根據需要修改相應的參數資訊,然後單擊運行

    dorisSparkDF = spark.read.format("doris") \
      .option("doris.table.identifier", "testdb.test") \
      .option("doris.fenodes", "<doris_address>:<http_port>") \
      .option("user", "<user>") \
      .option("password", "<password>") \
      .load()
    
    dorisSparkDF.show(3)

    其中,涉及參數資訊說明如下。

    參數

    描述

    樣本

    testdb.test

    Doris服務中實際的資料庫和表名。

    • 如果您使用的是其他Doris叢集,請根據實際情況填寫相應的配置。

    • 如果您使用的是EMR on ECS中建立的叢集,則填寫如下參數值:

      • testdb.test:本文以testdb.test為例。

      • <doris_address>:您可以在EMR on ECS控制台Doris叢集的節點管理頁面,單擊emr-master前的表徵圖,查看內網IP地址。

      • <http_port>:預設為8031。

      • <user>:預設使用者名為root

      • <password>:預設密碼為空白。

    <doris_address>

    Doris服務所在的節點內網IP地址。

    <http_port>

    Doris服務監聽HTTP請求的連接埠號碼。

    <user>

    用於串連Doris服務的使用者名稱。

    <password>

    用於串連Doris服務的使用者密碼。

    如果能夠正常返回資料,則表明配置正確。

    image

步驟五:Serverless Spark寫入Doris表

使用SQL會話寫Doris表

拷貝如下代碼到前一個步驟中新增的SparkSQL頁簽中,並根據需要修改相應的參數資訊,然後單擊運行

CREATE TEMPORARY VIEW test_write
USING doris
OPTIONS(
  "table.identifier" = "testdb.test",
  "fenodes" = "<doris_address>:<http_port>",
  "user" = "<user>",
  "password" = "<password>"
);

INSERT INTO test_write VALUES (4, 'd'), (5, 'e');
SELECT * FROM test_write;

如果能夠返回以下資料,則表明資料寫入成功。

image

使用Notebook會話寫Doris表

拷貝如下代碼到前一個步驟中新增的Notebook頁簽中,並根據需要修改相應的參數資訊,然後單擊運行

data = [(7, 'f'), (8, 'g')]
mockDataDF = spark.createDataFrame(data, ["id", "name"])
mockDataDF.write.mode("append").format("doris") \
  .option("doris.table.identifier", "testdb.test") \
  .option("doris.fenodes", "<doris_address>:<http_port>") \
  .option("user", "<user>") \
  .option("password", "<password>") \
  .save()
  
dorisSparkDF = spark.read.format("doris") \
  .option("doris.table.identifier", "testdb.test") \
  .option("doris.fenodes", "<doris_address>:<http_port>") \
  .option("user", "<user>") \
  .option("password", "<password>") \
  .load()

dorisSparkDF.show(10)

如果能夠返回以下資料,則表明資料寫入成功。

image