全部產品
Search
文件中心

E-MapReduce:讀寫StarRocks

更新時間:Dec 23, 2025

StarRocks官方提供了Spark Connector用於Spark和StarRocks之間的資料讀寫,Serverless Spark可以在開發時添加對應的配置串連StarRocks。本文為您介紹在EMR Serverless Spark中實現StarRocks的讀取和寫入操作。

前提條件

使用限制

Serverless Spark引擎的版本要求為esr-2.5.0、esr-3.1.0、esr-4.1.0及以上版本。

操作流程

步驟一:擷取Spark Connector JAR並上傳至OSS

  1. 參見Read data from StarRocks using Spark connector,選擇相應的方式下載對應版本的Spark Connector JAR。

    例如,本文選擇直接下載已經編譯好的JAR,即從Maven Central Repository擷取不同版本的Connector JAR包。

    說明

    Connector JAR包的命名格式為starrocks-spark-connector-${spark_version}_${scala_version}-${connector_version}.jar。例如,您使用的引擎版本為esr-4.1.0 (Spark 3.5.2, Scala 2.12),想使用1.1.2版本的Connector,則可以選擇starrocks-spark-connector-3.5_2.12-1.1.2.jar

  2. 將下載的Spark Connector JAR上傳至阿里雲OSS中,上傳操作可以參見簡單上傳

步驟二:添加網路連接

  1. 擷取網路資訊。

    您可以在EMR Serverless StarRocks頁面,進入目標StarRocks執行個體的实例详情頁面,以擷取該執行個體的專用網路和交換器資訊。

  2. 新增網路連接。

    1. EMR Serverless Spark頁面,進入目標Spark工作空間的网络连接頁面單擊新增网络连接

    2. 新增网络连接對話方塊中,輸入连接名称,並選擇之前擷取到的StarRocks執行個體的專用網路和交換器資訊,然後單擊确定

      說明

      網路連接與SR執行個體需保持一致,交換器應選擇與SR執行個體位於同一專用網路中的交換器。如果當前可用性區域內沒有可用的交換器,請單擊虚拟交换机,前往專用網路控制台建立,詳情請參見專用網路與交換器

步驟三:在StarRocks中建立庫表

  1. 串連StarRocks執行個體,詳情請參見通過EMR StarRocks Manager串連StarRocks執行個體

  2. SQL Editor查詢列表頁面,單擊文件或者右側地區的image表徵圖,然後單擊确认以新增檔案。

  3. 在新增的檔案中輸入以下SQL語句,單擊运行

    CREATE DATABASE `testdb`;
    
    CREATE TABLE `testdb`.`score_board`
    (
        `id` int(11) NOT NULL COMMENT "",
        `name` varchar(65533) NULL DEFAULT "" COMMENT "",
        `score` int(11) NOT NULL DEFAULT "0" COMMENT ""
    )
    ENGINE=OLAP
    PRIMARY KEY(`id`)
    COMMENT "OLAP"
    DISTRIBUTED BY HASH(`id`);

步驟四:通過Serverless Spark讀寫StarRocks

方式一:使用SQL會話、Notebook會話讀寫StarRocks

會話類型更多介紹,請參見會話管理

SQL會話

  1. 通過Serverless Spark向StarRocks寫入資料。

    1. 建立SQL會話,詳情請參見管理SQL會話

      建立會話時,選擇與Spark Connector版本對應的引擎版本,在网络连接中選擇上一步建立好的網路連接,並在Spark 配置中添加以下參數來載入Spark Connector。

      spark.emr.serverless.user.defined.jars  oss://<bucketname>/path/connector.jar

      其中,oss://<bucketname>/path/connector.jar為您步驟一中上傳至OSS的Spark Connector的路徑。例如,oss://emr-oss/spark/starrocks-spark-connector-3.5_2.12-1.1.2.jar

    2. 数据开发頁面,建立一個SparkSQL類型的任務,然後在右上方選擇建立好的SQL會話。

      更多操作,請參見SparkSQL開發

    3. 拷貝如下代碼到新增的SparkSQL頁簽中,並根據需要修改相應的參數資訊,然後單擊运行

      CREATE TEMPORARY VIEW score_board
      USING starrocks
      OPTIONS
      (
        "starrocks.table.identifier" = "testdb.score_board",
        "starrocks.fe.http.url" = "<fe_host>:<fe_http_port>",
        "starrocks.fe.jdbc.url" = "jdbc:mysql://<fe_host>:<fe_query_port>",
        "starrocks.user" = "<user>",
        "starrocks.password" = "<password>"
      );
      
      INSERT INTO `score_board` VALUES (1, "starrocks", 100), (2, "spark", 100);

      其中,涉及參數說明如下:

      • <fe_host>:Serverless StarRocks執行個體中FE的內網或公網地址。您可以在实例详情頁面的FE詳情地區查看。

        • 如果使用內網地址,請確保在同一VPC內。

        • 如果使用公網地址,需確保安全性群組規則允許相應的連接埠通訊,詳情請參見網路訪問與安全設定

      • <fe_http_port>:Serverless StarRocks執行個體中FE的HTTP連接埠(預設為8030)。您可以在实例详情頁面的FE詳情地區查看。

      • <fe_query_port>:Serverless StarRocks執行個體中FE的查詢連接埠(預設為9030)。您可以在实例详情頁面的FE詳情地區查看。

      • <user>:Serverless StarRocks執行個體的使用者名稱。預設提供admin使用者,具有管理員權限。您也可以通過用户管理頁面新增使用者來串連。新增使用者操作,請參見系統管理使用者及資料授權

      • <password>:使用者 <user> 對應的密碼。

  2. 通過Serverless Spark查詢寫入的資料。

    在本文樣本中,我們是在上述的SparkSQL任務中建立了一個臨時視圖 test_view,然後通過該視圖查詢 score_board 的資料。拷貝如下代碼到新增的SparkSQL頁簽中,選中代碼後單擊运行选中

    CREATE TEMPORARY VIEW test_view
    USING starrocks
    OPTIONS
    (
       "starrocks.table.identifier" = "testdb.score_board",
       "starrocks.fe.http.url" = "<fe_host>:<fe_http_port>",
       "starrocks.fe.jdbc.url" = "jdbc:mysql://<fe_host>:<fe_query_port>",
       "starrocks.user" = "<user>",
       "starrocks.password" = "<password>"
    );
    
    SELECT * FROM test_view;

    返回資訊如下圖所示。

    image

Notebook會話

  1. 通過Serverless Spark向StarRocks寫入資料。

    1. 建立Notebook會話,詳情請參見管理Notebook會話

      建立會話時,選擇與Spark Connector版本對應的引擎版本,在网络连接中選擇上一步建立好的網路連接,並在Spark 配置中添加以下參數來載入Spark Connector。

      spark.emr.serverless.user.defined.jars  oss://<bucketname>/path/connector.jar

      其中,oss://<bucketname>/path/connector.jar為您步驟一中上傳至OSS的Spark Connector的路徑。例如,oss://emr-oss/spark/starrocks-spark-connector-3.5_2.12-1.1.2.jar

    2. 数据开发頁面,選擇建立一個互動式開發 > Notebook類型的任務,然後在右上方選擇建立的Notebook會話。

      更多操作,請參見管理Notebook會話

    3. 拷貝如下代碼到新增的Notebook的Python儲存格中,單擊运行

      # 替換為您的Serverless StarRocks配置。
      fe_host = "<fe_host>"
      fe_http_port = "<fe_http_port>"
      fe_query_port = "<fe_query_port>"
      user = "<user>"
      password = "<password>"
      
      # 建立視圖
      create_table_sql = f"""
      CREATE TEMPORARY VIEW score_board
      USING starrocks
      OPTIONS (
        "starrocks.table.identifier" = "testdb.score_board",
        "starrocks.fe.http.url" = "{fe_host}:{fe_http_port}",
        "starrocks.fe.jdbc.url" = "jdbc:mysql://{fe_host}:{fe_query_port}",
        "starrocks.user" = "{user}",
        "starrocks.password" = "{password}"
      )
      """
      
      spark.sql(create_table_sql)
      
      #插入資料
      insert_data_sql = """
      INSERT INTO `score_board` VALUES (1, "starrocks", 100), (2, "spark", 100)
      """
      
      spark.sql(insert_data_sql)
      

      填寫樣本如下圖所示。

      image

      其中,涉及參數說明如下:

      • <fe_host>:Serverless StarRocks執行個體中FE的內網或公網地址。您可以在实例详情頁面的FE詳情地區查看。

        • 如果使用內網地址,請確保在同一VPC內。

        • 如果使用公網地址,需確保安全性群組規則允許相應的連接埠通訊,詳情請參見網路訪問與安全設定

      • <fe_http_port>:Serverless StarRocks執行個體中FE的HTTP連接埠(預設為8030)。您可以在实例详情頁面的FE詳情地區查看。

      • <fe_query_port>:Serverless StarRocks執行個體中FE的查詢連接埠(預設為9030)。您可以在实例详情頁面的FE詳情地區查看。

      • <user>:Serverless StarRocks執行個體的使用者名稱。預設提供admin使用者,具有管理員權限。您也可以通過用户管理頁面新增使用者來串連。新增使用者操作,請參見系統管理使用者及資料授權

      • <password>:使用者 <user> 對應的密碼。

  2. 通過Serverless Spark查詢寫入的資料。

    在本文樣本中,我們新增一個Python儲存格,在其中建立了一個臨時視圖 test_view,然後通過該視圖查詢 score_board 的資料。拷貝如下代碼到新增的Python儲存格中,然後單擊image表徵圖。

    #建立view
    create_view_sql=f"""
    CREATE TEMPORARY VIEW test_view
    USING starrocks
    OPTIONS (
      "starrocks.table.identifier" = "testdb.score_board",
      "starrocks.fe.http.url" = "{fe_host}:{fe_http_port}",
      "starrocks.fe.jdbc.url" = "jdbc:mysql://{fe_host}:{fe_query_port}",
      "starrocks.user" = "{user}",
      "starrocks.password" = "{password}"
    )
    """
    spark.sql(create_view_sql)
      
    #查詢
    query_sql="SELECT * FROM test_view"
    result_df = spark.sql(query_sql)
    result_df.show()

    返回資訊如下圖所示。

    image

方式二:使用Spark批任務讀寫StarRocks

  1. 建立Spark批任務。

    1. 在EMR Serverless Spark頁面,單擊左側的資料開發

    2. 開發目錄頁簽下,單擊image表徵圖。

    3. 在建立對話方塊中,輸入名稱,類型選擇批任務 > SQL,然後單擊確定

      類型您可以根據實際情況進行調整,本文以SQL為例。更多型別參數介紹,請參見Application開發

  2. 通過Spark批任務讀寫StarRocks。

    1. 在建立的任務開發的右上方選擇隊列。

      添加隊列的具體操作,請參見管理資源隊列

    2. 在建立的任務開發中,配置以下資訊,其餘參數無需配置,然後單擊運行

      參數

      說明

      SQL檔案

      本樣本所使用的檔案為spark_sql_starrocks.sql,其內容是SQL會話中的SQL語句,請根據實際情況對具體配置進行替換。在使用之前,您需要先下載該檔案並進行相應的修改,然後在檔案管理頁面進行上傳。

      spark_sql_starrocks.sql參數說明

      其中,涉及參數說明如下:

      • <fe_host>:Serverless StarRocks執行個體中FE的內網或公網地址。您可以在实例详情頁面的FE詳情地區查看。

        • 如果使用內網地址,請確保在同一VPC內。

        • 如果使用公網地址,需確保安全性群組規則允許相應的連接埠通訊,詳情請參見網路訪問與安全設定

      • <fe_http_port>:Serverless StarRocks執行個體中FE的HTTP連接埠(預設為8030)。您可以在实例详情頁面的FE詳情地區查看。

      • <fe_query_port>:Serverless StarRocks執行個體中FE的查詢連接埠(預設為9030)。您可以在实例详情頁面的FE詳情地區查看。

      • <user>:Serverless StarRocks執行個體的使用者名稱。預設提供admin使用者,具有管理員權限。您也可以通過用户管理頁面新增使用者來串連。新增使用者操作,請參見系統管理使用者及資料授權

      • <password>:使用者 <user> 對應的密碼。

      引擎版本

      選擇與Spark Connector版本對應的引擎版本。

      網路連接

      選擇前一步建立好的網路連接。

      Spark 配置

      Spark配置中添加以下參數來載入Spark Connector。

      spark.emr.serverless.user.defined.jars  oss://<bucketname>/path/connector.jar

      其中,oss://<bucketname>/path/connector.jar為您步驟一中上傳至OSS的Spark Connector的路徑。例如,oss://emr-oss/spark/starrocks-spark-connector-3.5_2.12-1.1.2.jar

  3. 查看日誌資訊。

    1. 您可以在下方的運行記錄地區,單擊操作列的詳情

    2. 單擊日誌探查頁簽,查看該任務的日誌資訊。

      image

相關文檔

StarRocks的官方文檔: