StarRocks官方提供了Spark Connector用於Spark和StarRocks之間的資料讀寫,Serverless Spark可以在開發時添加對應的配置串連StarRocks。本文為您介紹在EMR Serverless Spark中實現StarRocks的讀取和寫入操作。
前提條件
使用限制
Serverless Spark引擎的版本要求為esr-2.5.0、esr-3.1.0、esr-4.1.0及以上版本。
操作流程
步驟一:擷取Spark Connector JAR並上傳至OSS
參見Read data from StarRocks using Spark connector,選擇相應的方式下載對應版本的Spark Connector JAR。
例如,本文選擇直接下載已經編譯好的JAR,即從Maven Central Repository擷取不同版本的Connector JAR包。
說明Connector JAR包的命名格式為
starrocks-spark-connector-${spark_version}_${scala_version}-${connector_version}.jar。例如,您使用的引擎版本為esr-4.1.0 (Spark 3.5.2, Scala 2.12),想使用1.1.2版本的Connector,則可以選擇starrocks-spark-connector-3.5_2.12-1.1.2.jar。將下載的Spark Connector JAR上傳至阿里雲OSS中,上傳操作可以參見簡單上傳。
步驟二:添加網路連接
擷取網路資訊。
您可以在EMR Serverless StarRocks頁面,進入目標StarRocks執行個體的实例详情頁面,以擷取該執行個體的專用網路和交換器資訊。
新增網路連接。
在EMR Serverless Spark頁面,進入目標Spark工作空間的网络连接頁面,單擊新增网络连接。
在新增网络连接對話方塊中,輸入连接名称,並選擇之前擷取到的StarRocks執行個體的專用網路和交換器資訊,然後單擊确定。
說明網路連接與SR執行個體需保持一致,交換器應選擇與SR執行個體位於同一專用網路中的交換器。如果當前可用性區域內沒有可用的交換器,請單擊虚拟交换机,前往專用網路控制台建立,詳情請參見專用網路與交換器。
步驟三:在StarRocks中建立庫表
串連StarRocks執行個體,詳情請參見通過EMR StarRocks Manager串連StarRocks執行個體。
在SQL Editor的查詢列表頁面,單擊文件或者右側地區的
表徵圖,然後單擊确认以新增檔案。在新增的檔案中輸入以下SQL語句,單擊运行。
CREATE DATABASE `testdb`; CREATE TABLE `testdb`.`score_board` ( `id` int(11) NOT NULL COMMENT "", `name` varchar(65533) NULL DEFAULT "" COMMENT "", `score` int(11) NOT NULL DEFAULT "0" COMMENT "" ) ENGINE=OLAP PRIMARY KEY(`id`) COMMENT "OLAP" DISTRIBUTED BY HASH(`id`);
步驟四:通過Serverless Spark讀寫StarRocks
方式一:使用SQL會話、Notebook會話讀寫StarRocks
會話類型更多介紹,請參見會話管理。
SQL會話
通過Serverless Spark向StarRocks寫入資料。
建立SQL會話,詳情請參見管理SQL會話。
建立會話時,選擇與Spark Connector版本對應的引擎版本,在网络连接中選擇上一步建立好的網路連接,並在Spark 配置中添加以下參數來載入Spark Connector。
spark.emr.serverless.user.defined.jars oss://<bucketname>/path/connector.jar其中,
oss://<bucketname>/path/connector.jar為您步驟一中上傳至OSS的Spark Connector的路徑。例如,oss://emr-oss/spark/starrocks-spark-connector-3.5_2.12-1.1.2.jar。在数据开发頁面,建立一個SparkSQL類型的任務,然後在右上方選擇建立好的SQL會話。
更多操作,請參見SparkSQL開發。
拷貝如下代碼到新增的SparkSQL頁簽中,並根據需要修改相應的參數資訊,然後單擊运行。
CREATE TEMPORARY VIEW score_board USING starrocks OPTIONS ( "starrocks.table.identifier" = "testdb.score_board", "starrocks.fe.http.url" = "<fe_host>:<fe_http_port>", "starrocks.fe.jdbc.url" = "jdbc:mysql://<fe_host>:<fe_query_port>", "starrocks.user" = "<user>", "starrocks.password" = "<password>" ); INSERT INTO `score_board` VALUES (1, "starrocks", 100), (2, "spark", 100);其中,涉及參數說明如下:
<fe_host>:Serverless StarRocks執行個體中FE的內網或公網地址。您可以在实例详情頁面的FE詳情地區查看。如果使用內網地址,請確保在同一VPC內。
如果使用公網地址,需確保安全性群組規則允許相應的連接埠通訊,詳情請參見網路訪問與安全設定。
<fe_http_port>:Serverless StarRocks執行個體中FE的HTTP連接埠(預設為8030)。您可以在实例详情頁面的FE詳情地區查看。<fe_query_port>:Serverless StarRocks執行個體中FE的查詢連接埠(預設為9030)。您可以在实例详情頁面的FE詳情地區查看。<user>:Serverless StarRocks執行個體的使用者名稱。預設提供admin使用者,具有管理員權限。您也可以通過用户管理頁面新增使用者來串連。新增使用者操作,請參見系統管理使用者及資料授權。<password>:使用者<user>對應的密碼。
通過Serverless Spark查詢寫入的資料。
在本文樣本中,我們是在上述的SparkSQL任務中建立了一個臨時視圖
test_view,然後通過該視圖查詢score_board的資料。拷貝如下代碼到新增的SparkSQL頁簽中,選中代碼後單擊运行选中。CREATE TEMPORARY VIEW test_view USING starrocks OPTIONS ( "starrocks.table.identifier" = "testdb.score_board", "starrocks.fe.http.url" = "<fe_host>:<fe_http_port>", "starrocks.fe.jdbc.url" = "jdbc:mysql://<fe_host>:<fe_query_port>", "starrocks.user" = "<user>", "starrocks.password" = "<password>" ); SELECT * FROM test_view;返回資訊如下圖所示。

Notebook會話
通過Serverless Spark向StarRocks寫入資料。
建立Notebook會話,詳情請參見管理Notebook會話。
建立會話時,選擇與Spark Connector版本對應的引擎版本,在网络连接中選擇上一步建立好的網路連接,並在Spark 配置中添加以下參數來載入Spark Connector。
spark.emr.serverless.user.defined.jars oss://<bucketname>/path/connector.jar其中,
oss://<bucketname>/path/connector.jar為您步驟一中上傳至OSS的Spark Connector的路徑。例如,oss://emr-oss/spark/starrocks-spark-connector-3.5_2.12-1.1.2.jar。在数据开发頁面,選擇建立一個類型的任務,然後在右上方選擇建立的Notebook會話。
更多操作,請參見管理Notebook會話。
拷貝如下代碼到新增的Notebook的Python儲存格中,單擊运行。
# 替換為您的Serverless StarRocks配置。 fe_host = "<fe_host>" fe_http_port = "<fe_http_port>" fe_query_port = "<fe_query_port>" user = "<user>" password = "<password>" # 建立視圖 create_table_sql = f""" CREATE TEMPORARY VIEW score_board USING starrocks OPTIONS ( "starrocks.table.identifier" = "testdb.score_board", "starrocks.fe.http.url" = "{fe_host}:{fe_http_port}", "starrocks.fe.jdbc.url" = "jdbc:mysql://{fe_host}:{fe_query_port}", "starrocks.user" = "{user}", "starrocks.password" = "{password}" ) """ spark.sql(create_table_sql) #插入資料 insert_data_sql = """ INSERT INTO `score_board` VALUES (1, "starrocks", 100), (2, "spark", 100) """ spark.sql(insert_data_sql)填寫樣本如下圖所示。

其中,涉及參數說明如下:
<fe_host>:Serverless StarRocks執行個體中FE的內網或公網地址。您可以在实例详情頁面的FE詳情地區查看。如果使用內網地址,請確保在同一VPC內。
如果使用公網地址,需確保安全性群組規則允許相應的連接埠通訊,詳情請參見網路訪問與安全設定。
<fe_http_port>:Serverless StarRocks執行個體中FE的HTTP連接埠(預設為8030)。您可以在实例详情頁面的FE詳情地區查看。<fe_query_port>:Serverless StarRocks執行個體中FE的查詢連接埠(預設為9030)。您可以在实例详情頁面的FE詳情地區查看。<user>:Serverless StarRocks執行個體的使用者名稱。預設提供admin使用者,具有管理員權限。您也可以通過用户管理頁面新增使用者來串連。新增使用者操作,請參見系統管理使用者及資料授權。<password>:使用者<user>對應的密碼。
通過Serverless Spark查詢寫入的資料。
在本文樣本中,我們新增一個Python儲存格,在其中建立了一個臨時視圖
test_view,然後通過該視圖查詢score_board的資料。拷貝如下代碼到新增的Python儲存格中,然後單擊
表徵圖。#建立view create_view_sql=f""" CREATE TEMPORARY VIEW test_view USING starrocks OPTIONS ( "starrocks.table.identifier" = "testdb.score_board", "starrocks.fe.http.url" = "{fe_host}:{fe_http_port}", "starrocks.fe.jdbc.url" = "jdbc:mysql://{fe_host}:{fe_query_port}", "starrocks.user" = "{user}", "starrocks.password" = "{password}" ) """ spark.sql(create_view_sql) #查詢 query_sql="SELECT * FROM test_view" result_df = spark.sql(query_sql) result_df.show()返回資訊如下圖所示。

方式二:使用Spark批任務讀寫StarRocks
建立Spark批任務。
在EMR Serverless Spark頁面,單擊左側的資料開發。
在開發目錄頁簽下,單擊
表徵圖。在建立對話方塊中,輸入名稱,類型選擇,然後單擊確定。
類型您可以根據實際情況進行調整,本文以SQL為例。更多型別參數介紹,請參見Application開發。
通過Spark批任務讀寫StarRocks。
在建立的任務開發的右上方選擇隊列。
添加隊列的具體操作,請參見管理資源隊列。
在建立的任務開發中,配置以下資訊,其餘參數無需配置,然後單擊運行。
參數
說明
SQL檔案
本樣本所使用的檔案為spark_sql_starrocks.sql,其內容是SQL會話中的SQL語句,請根據實際情況對具體配置進行替換。在使用之前,您需要先下載該檔案並進行相應的修改,然後在檔案管理頁面進行上傳。
引擎版本
選擇與Spark Connector版本對應的引擎版本。
網路連接
選擇前一步建立好的網路連接。
Spark 配置
在Spark配置中添加以下參數來載入Spark Connector。
spark.emr.serverless.user.defined.jars oss://<bucketname>/path/connector.jar其中,
oss://<bucketname>/path/connector.jar為您步驟一中上傳至OSS的Spark Connector的路徑。例如,oss://emr-oss/spark/starrocks-spark-connector-3.5_2.12-1.1.2.jar。
查看日誌資訊。
您可以在下方的運行記錄地區,單擊操作列的詳情。
單擊日誌探查頁簽,查看該任務的日誌資訊。

相關文檔
StarRocks的官方文檔: