AnalyticDB for MySQL支援通過外表讀取並匯入MaxCompute資料。通過外表匯入資料可以最大限度地利用叢集資源,實現高效能資料匯入。本文主要介紹如何通過外表將MaxCompute資料匯入AnalyticDB for MySQL。
功能介紹
AnalyticDB for MySQL產品系列 | 訪問方式 | AnalyticDB for MySQL核心版本 | 資料訪問效率 |
企業版、基礎版及湖倉版 | Tunnel Record API方式 | 無限制 | 適合小規模資料訪問,資料訪問和匯入速度慢。 |
Tunnel Arrow API方式 | 3.2.2.3及以上版本 | 使用列式讀取資料,減少資料訪問和匯入時間,提供更快的資料轉送速度。 | |
數倉版 | Tunnel Record API方式 | 無限制 | 使用公用Data Transmission Service資源群組,該資源會被該地區所有專案共用使用,資料訪問和匯入速度慢。 |
前提條件
MaxCompute專案與AnalyticDB for MySQL叢集位於同一地區。具體操作,請參見建立叢集。
已添加AnalyticDB for MySQL的VPC網段到MaxCompute專案的白名單中。
說明登入雲原生資料倉儲AnalyticDB MySQL控制台,在叢集資訊頁面查詢VPC ID。然後登入專用網路控制台,在專用網路頁面根據VPC ID查詢網段。設定MaxCompute白名單的操作,請參見管理IP白名單。
AnalyticDB for MySQL企業版、基礎版及湖倉版叢集:
已開啟ENI訪問。
重要登入雲原生資料倉儲AnalyticDB MySQL控制台,在的網路資訊地區,開啟ENI網路開關。
開啟和關閉ENI網路會導致資料庫連接中斷大約2分鐘,無法讀寫。請謹慎評估影響後再開啟或關閉ENI網路。
通過Tunnel Arrow API方式訪問並匯入MaxCompute資料,叢集需為3.2.2.3及以上版本。
說明請在雲原生資料倉儲AnalyticDB MySQL控制台集群資訊頁面的配寘資訊地區,查看和升級核心版本。
資料準備
本文樣本中的MaxCompute專案為odps_project,樣本表odps_nopart_import_test。樣本如下:
CREATE TABLE IF NOT EXISTS odps_nopart_import_test (
id int,
name string,
age int)
partitioned by (dt string);在odps_nopart_import_test表中添加分區,樣本如下:
ALTER TABLE odps_nopart_import_test
ADD
PARTITION (dt='202207');向分區中添加資料,樣本如下:
INSERT INTO odps_project.odps_nopart_import_test
PARTITION (dt='202207')
VALUES (1,'james',10),(2,'bond',20),(3,'jack',30),(4,'lucy',40);操作步驟
企業版、基礎版及湖倉版
預設情況下,AnalyticDB for MySQL叢集會使用Tunnel Record API方式訪問並匯入MaxCompute資料。 若您需要通過Tunnel Arrow API方式訪問並匯入MaxCompute資料,請先開啟Arrow API功能。開啟後,AnalyticDB for MySQL叢集會使用Tunnel Arrow API方式進行匯入。
Tunnel Record API方式
資料匯入方式分為常規匯入(預設)和彈性匯入。常規匯入在計算節點中讀取來源資料,然後在儲存節點中構建索引,消耗計算資源和儲存資源。彈性匯入在Serverless Spark Job中讀取來源資料和構建索引,消耗Job型資源群組的資源。僅核心版本3.1.10.0及以上且已建立Job型資源群組的叢集支援彈性匯入資料。相較於常規匯入,彈性匯入可以大幅減少資源的消耗,降低匯入處理程序中對線上讀寫業務的影響,提升資源隔離性和資料匯入效率。更多內容,請參見資料匯入方式介紹。
常規匯入
進入SQL編輯器。
登入雲原生資料倉儲AnalyticDB MySQL控制台,在左上方選擇叢集所在地區。在左側導覽列,單擊集群清單,然後單擊目的地組群ID。
在左側導覽列,單擊。
建立外部資料庫。樣本如下:
CREATE EXTERNAL DATABASE adb_external_db;建立外表。本文樣本為
test_adb。CREATE EXTERNAL TABLE IF NOT EXISTS adb_external_db.test_adb ( id int, name varchar(1023), age int, dt string ) ENGINE='ODPS' TABLE_PROPERTIES='{ "accessid":"yourAccessKeyID", "endpoint":"http://service.cn-hangzhou.maxcompute.aliyun.com/api", "accesskey":"yourAccessKeySecret", "partition_column":"dt", "project_name":"odps_project", "table_name":"odps_nopart_import_test" }';說明AnalyticDB for MySQL外表和MaxCompute中表的欄位名稱、欄位數量、欄位順序需要一致,欄位類型需要相容。
外表的參數說明,請參見CREATE EXTERNAL TABLE。
查詢資料。
SELECT * FROM adb_external_db.test_adb;返回結果如下:
+------+-------+------+---------+ | id | name | age | dt | +------+-------+------+---------+ | 1 | james | 10 | 202207 | | 2 | bond | 20 | 202207 | | 3 | jack | 30 | 202207 | | 4 | lucy | 40 | 202207 | +------+-------+------+---------+執行以下步驟將MaxCompute資料匯入至AnalyticDB for MySQL。
在AnalyticDB for MySQL中建立資料庫,樣本如下:
CREATE DATABASE adb_demo;在AnalyticDB for MySQL中建立表用於儲存從MaxCompute中匯入的資料,樣本如下:
說明新表和步驟3中建立的外表的欄位順序和欄位數量需要一致,欄位類型相容。
CREATE TABLE IF NOT EXISTS adb_demo.adb_import_test( id int, name string, age int, dt string, PRIMARY KEY(id,dt) ) DISTRIBUTED BY HASH(id) PARTITION BY VALUE('dt');向表中寫入資料,樣本如下:
方式一:執行INSERT INTO匯入資料,當主鍵重複時會自動忽略當前寫入資料,不做更新,作用等同於
INSERT IGNORE INTO,詳情請參見INSERT INTO。樣本如下:INSERT INTO adb_demo.adb_import_test SELECT * FROM adb_external_db.test_adb;如果需要將特定分區的資料匯入
adb_demo.adb_import_test,可以執行:INSERT INTO adb_demo.adb_import_test SELECT * FROM adb_external_db.test_adb WHERE dt = '202207';方式二:執行INSERT OVERWRITE INTO匯入資料,會覆蓋表中原有的資料。樣本如下:
INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_external_db.test_adb;方式三:非同步執行INSERT OVERWRITE INTO匯入資料。通常使用
SUBMIT JOB提交非同步任務,由後台調度,可以在寫入任務前增加Hint(/*+ direct_batch_load=true*/)加速寫入任務。詳情請參見非同步寫入。樣本如下:SUBMIT job INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_external_db.test_adb;返回結果如下:
+---------------------------------------+ | job_id | +---------------------------------------+ | 2020112122202917203100908203303****** | +---------------------------------------+關於非同步提交任務詳情,請參見非同步提交匯入任務。
彈性匯入
進入SQL編輯器。
登入雲原生資料倉儲AnalyticDB MySQL控制台,在左上方選擇叢集所在地區。在左側導覽列,單擊集群清單,然後單擊目的地組群ID。
在左側導覽列,單擊。
建立資料庫。如果有已建立的資料庫,可以忽略本步驟。樣本如下:
CREATE DATABASE adb_demo;建立外表。
說明AnalyticDB for MySQL外表的名稱需要和MaxCompute專案的名稱相同,否則建立外表會失敗。
AnalyticDB for MySQL外表和MaxCompute中表的欄位名稱、欄位數量、欄位順序需要一致,欄位類型需要相容。
彈性匯入僅支援
CREATE TABLE語句建立外表。
CREATE TABLE IF NOT EXISTS test_adb ( id int, name string, age int, dt string ) ENGINE='ODPS' TABLE_PROPERTIES='{ "endpoint":"http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api", "accessid":"yourAccessKeyID", "accesskey":"yourAccessKeySecret", "partition_column":"dt", "project_name":"odps_project", "table_name":"odps_nopart_import_test" }';外表支援設定的參數及參數說明,請參見參數說明。
查詢資料。
SELECT * FROM adb_demo.test_adb;返回結果如下:
+------+-------+------+---------+ | id | name | age | dt | +------+-------+------+---------+ | 1 | james | 10 | 202207 | | 2 | bond | 20 | 202207 | | 3 | jack | 30 | 202207 | | 4 | lucy | 40 | 202207 | +------+-------+------+---------+在AnalyticDB for MySQL中建立表用於儲存從MaxCompute中匯入的資料。樣本如下:
說明建立的內表和步驟3中建立的外表的欄位名稱、欄位數量、欄位順序、欄位類型必須相同。
CREATE TABLE IF NOT EXISTS adb_import_test ( id int, name string, age int, dt string, PRIMARY KEY(id,dt) ) DISTRIBUTED BY HASH(id) PARTITION BY VALUE('dt') LIFECYCLE 30;匯入資料。
重要彈性匯入僅支援通過
INSERT OVERWRITE INTO語句匯入資料。方法一:執行INSERT OVERWRITE INTO彈性匯入資料,會覆蓋表中原有的資料。樣本如下:
/*+ elastic_load=true, elastic_load_configs=[adb.load.resource.group.name=resource_group]*/ INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_demo.test_adb;
方法二:非同步執行INSERT OVERWRITE INTO彈性匯入資料。通常使用
SUBMIT JOB提交非同步任務,由後台調度。/*+ elastic_load=true, elastic_load_configs=[adb.load.resource.group.name=resource_group]*/ SUBMIT JOB INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_demo.test_adb;重要非同步提交彈性匯入任務時,不支援設定優先權隊列。
返回結果如下:
+---------------------------------------+ | job_id | +---------------------------------------+ | 2023081517192220291720310090151****** | +---------------------------------------+
使用
SUBMIT JOB提交非同步任務後,返回結果僅表示非同步任務提交成功。您可以通過job_id終止非同步任務或查詢非同步任務狀態,判斷任務是否執行成功。具體操作,請參見非同步提交匯入任務。Hint參數說明:
elastic_load:是否使用彈性匯入方式。取值:true或false(預設值)。
elastic_load_configs:彈性匯入方式支援配置的參數。參數需使用方括弧([ ])括起來,且多個參數之間以豎線(|)分隔,支援配置的參數如下表所示:
參數
是否必填
說明
adb.load.resource.group.name
是
執行彈性匯入任務的Job資源群組名稱。
adb.load.job.max.acu
否
單個彈性匯入任務最多使用的資源。單位為ACU,最小值為5 ACU。預設值為叢集Shard個數+1。
執行如下語句可查詢叢集Shard個數:
SELECT count(1) FROM information_schema.kepler_meta_shards;spark.driver.resourceSpec
否
Spark driver的資源規格。預設值為small。取值範圍,請參見Spark應用配置參數說明的型號列。
spark.executor.resourceSpec
否
Spark executor的資源規格。預設值為large。取值範圍,請參見Spark應用配置參數說明的型號列。
spark.adb.executorDiskSize
否
Spark executor的磁碟容量,取值範圍為(0,100],單位為GiB,預設值為10 Gi。更多資訊,請參見指定Driver和Executor資源。
(可選)查看已提交的匯入任務是否為彈性匯入任務。
SELECT job_name, (job_type = 3) AS is_elastic_load FROM INFORMATION_SCHEMA.kepler_meta_async_jobs WHERE job_name = "2023081818010602101701907303151******";返回結果如下:
+---------------------------------------+------------------+ | job_name | is_elastic_load | +---------------------------------------+------------------+ | 2023081517195203101701907203151****** | 1 | +---------------------------------------+------------------+is_elastic_load的傳回值為1,表示已提交的匯入任務是彈性匯入任務;若為0,則表示已提交的匯入任務是常規匯入任務。
Tunnel Arrow API方式
步驟一:開啟Arrow API
開啟方法
您可以通過SET命令或Hint在叢集層級和查詢層級開啟Arrow API:
叢集層級開啟Arrow API:
SET ADB_CONFIG <config_name>= <value>;查詢層級開啟Arrow API:
/*<config_name>= <value>*/ SELECT * FROM table;
Arrow API相關配置參數
參數(config_name) | 說明 |
ODPS_TUNNEL_ARROW_ENABLED | 是否開啟Arrow API。取值:
|
ODPS_TUNNEL_SPLIT_BY_SIZE_ENABLED | 是否開啟動態Split切分。取值:
|
步驟二:訪問並匯入MaxCompute資料
進入SQL編輯器。
登入雲原生資料倉儲AnalyticDB MySQL控制台,在左上方選擇叢集所在地區。在左側導覽列,單擊集群清單,然後單擊目的地組群ID。
在左側導覽列,單擊。
建立外部資料庫。樣本如下:
CREATE EXTERNAL DATABASE adb_external_db;建立外表。本文樣本為
test_adb。CREATE EXTERNAL TABLE IF NOT EXISTS adb_external_db.test_adb ( id int, name varchar(1023), age int, dt string ) ENGINE='ODPS' TABLE_PROPERTIES='{ "accessid":"yourAccessKeyID", "endpoint":"http://service.cn-hangzhou.maxcompute.aliyun.com/api", "accesskey":"yourAccessKeySecret", "partition_column":"dt", "project_name":"odps_project", "table_name":"odps_nopart_import_test" }';說明AnalyticDB for MySQL外表和MaxCompute中表的欄位名稱、欄位數量、欄位順序需要一致,欄位類型需要相容。
外表的參數說明,請參見CREATE EXTERNAL TABLE。
查詢資料。
SELECT * FROM adb_external_db.test_adb;返回結果如下:
+------+-------+------+---------+ | id | name | age | dt | +------+-------+------+---------+ | 1 | james | 10 | 202207 | | 2 | bond | 20 | 202207 | | 3 | jack | 30 | 202207 | | 4 | lucy | 40 | 202207 | +------+-------+------+---------+執行以下步驟將MaxCompute資料匯入至AnalyticDB for MySQL。
在AnalyticDB for MySQL中建立資料庫,樣本如下:
CREATE DATABASE adb_demo;在AnalyticDB for MySQL中建立表用於儲存從MaxCompute中匯入的資料,樣本如下:
說明新表和步驟3中建立的外表的欄位順序和欄位數量需要一致,欄位類型相容。
CREATE TABLE IF NOT EXISTS adb_demo.adb_import_test( id int, name string, age int, dt string PRIMARY KEY(id,dt) ) DISTRIBUTED BY HASH(id) PARTITION BY VALUE('dt');向表中寫入資料,樣本如下:
方式一:執行INSERT INTO匯入資料,當主鍵重複時會自動忽略當前寫入資料,不做更新,作用等同於
INSERT IGNORE INTO,詳情請參見INSERT INTO。樣本如下:INSERT INTO adb_demo.adb_import_test SELECT * FROM adb_external_db.test_adb;如果需要將特定分區的資料匯入
adb_demo.adb_import_test,可以執行:INSERT INTO adb_demo.adb_import_test SELECT * FROM adb_external_db.test_adb WHERE dt = '202207';方式二:執行INSERT OVERWRITE INTO匯入資料,會覆蓋表中原有的資料。樣本如下:
INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_external_db.test_adb;方式三:非同步執行INSERT OVERWRITE INTO匯入資料。通常使用
SUBMIT JOB提交非同步任務,由後台調度,可以在寫入任務前增加Hint(/*+ direct_batch_load=true*/)加速寫入任務。詳情請參見非同步寫入。樣本如下:SUBMIT job INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_external_db.test_adb;返回結果如下:
+---------------------------------------+ | job_id | +---------------------------------------+ | 2020112122202917203100908203303****** | +---------------------------------------+關於非同步提交任務詳情,請參見非同步提交匯入任務。
數倉版
串連目標AnalyticDB for MySQL叢集。詳細操作步驟,請參見串連叢集。
建立目標資料庫。
CREATE database test_adb;建立MaxCompute外表。本文以
odps_nopart_import_test_external_table為例。CREATE TABLE IF NOT EXISTS odps_nopart_import_test_external_table ( id int, name string, age int, dt string ) ENGINE='ODPS' TABLE_PROPERTIES='{ "endpoint":"http://service.cn.maxcompute.aliyun-inc.com/api", "accessid":"yourAccessKeyID", "accesskey":"yourAccessKeySecret", "partition_column":"dt", "project_name":"odps_project1", "table_name":"odps_nopart_import_test" }';參數
說明
ENGINE=’ODPS’外表的儲存引擎。讀寫MaxCompute資料時,取值為ODPS。
endpointMaxCompute的EndPoint(網域名稱節點)。
說明目前僅支援AnalyticDB for MySQL通過MaxCompute的VPC網路Endpoint訪問MaxCompute。
查詢各地區VPC網路的Endpoint,請參見VPC Endpoint。
accessid阿里雲帳號或者具備MaxCompute存取權限的RAM使用者的AccessKey ID。
如何擷取AccessKey ID和AccessKey Secret,請參見帳號與許可權。
accesskey阿里雲帳號或者具備MaxCompute存取權限的RAM使用者的AccessKey Secret。
如何擷取AccessKey ID和AccessKey Secret,請參見帳號與許可權。
partition_column本文使用的樣本是建立分區表的樣本,所以需要配置
partition_column。如果MaxCompute的表是非分區表,那麼AnalyticDB for MySQL中也需要建立非分區表,此時無需配置partition_column。project_nameMaxCompute中的工作空間名稱。
table_nameMaxCompute中的資料來源表名。
在
test_adb資料庫中建立表adb_nopart_import_test,用於儲存從MaxCompute中匯入的資料。CREATE TABLE IF NOT EXISTS adb_nopart_import_test ( id int, name string, age int, dt string, PRIMARY KEY(id,dt) ) DISTRIBUTED BY HASH(id) PARTITION BY VALUE('dt') LIFECYCLE 30;匯入資料。
方式一:執行
INSERT INTO匯入資料,當主鍵重複時會自動忽略當前寫入資料,不做更新,作用等同於INSERT IGNORE INTO,詳情請參見INSERT INTO。樣本如下:INSERT INTO adb_nopart_import_test SELECT * FROM odps_nopart_import_test_external_table;通過SELECT查詢寫入表中的資料,樣本如下:
SELECT * FROM adb_nopart_import_test;返回結果如下:
+------+-------+------+---------+ | id | name | age | dt | +------+-------+------+---------+ | 1 | james | 10 | 202207 | | 2 | bond | 20 | 202207 | | 3 | jack | 30 | 202207 | | 4 | lucy | 40 | 202207 | +------+-------+------+---------+如果需要將特定分區的資料匯入
adb_nopart_import_test,可以執行:INSERT INTO adb_nopart_import_test SELECT * FROM odps_nopart_import_test_external_table WHERE dt = '202207';方式二:執行
INSERT OVERWRITE匯入資料,會覆蓋表中原有的資料。樣本如下:INSERT OVERWRITE adb_nopart_import_test SELECT * FROM odps_nopart_import_test_external_table;方式三:非同步執行
INSERT OVERWRITE匯入資料。通常使用SUBMIT JOB提交非同步任務,由後台調度,可以在寫入任務前增加Hint加速寫入任務。詳情請參見非同步寫入。樣本如下:SUBMIT JOB INSERT OVERWRITE adb_nopart_import_test SELECT * FROM odps_nopart_import_test_external_table;返回結果如下:
+---------------------------------------+ | job_id | +---------------------------------------+ | 2020112122202917203100908203303****** |關於非同步提交任務詳情請參見非同步提交匯入任務。