全部產品
Search
文件中心

AnalyticDB:通過外表匯入MaxCompute資料

更新時間:Oct 18, 2025

AnalyticDB for MySQL支援通過外表讀取並匯入MaxCompute資料。通過外表匯入資料可以最大限度地利用叢集資源,實現高效能資料匯入。本文主要介紹如何通過外表將MaxCompute資料匯入AnalyticDB for MySQL

功能介紹

AnalyticDB for MySQL產品系列

訪問方式

AnalyticDB for MySQL核心版本

資料訪問效率

企業版、基礎版及湖倉版

Tunnel Record API方式

無限制

適合小規模資料訪問,資料訪問和匯入速度慢。

Tunnel Arrow API方式

3.2.2.3及以上版本

使用列式讀取資料,減少資料訪問和匯入時間,提供更快的資料轉送速度。

數倉版

Tunnel Record API方式

無限制

使用公用Data Transmission Service資源群組,該資源會被該地區所有專案共用使用,資料訪問和匯入速度慢。

前提條件

資料準備

本文樣本中的MaxCompute專案為odps_project,樣本表odps_nopart_import_test。樣本如下:

CREATE TABLE IF NOT EXISTS odps_nopart_import_test (
    id int,
    name string,
    age int)
partitioned by (dt string);

odps_nopart_import_test表中添加分區,樣本如下:

ALTER TABLE odps_nopart_import_test 
ADD 
PARTITION (dt='202207');

向分區中添加資料,樣本如下:

INSERT INTO odps_project.odps_nopart_import_test 
PARTITION (dt='202207') 
VALUES (1,'james',10),(2,'bond',20),(3,'jack',30),(4,'lucy',40);

操作步驟

企業版、基礎版及湖倉版

預設情況下,AnalyticDB for MySQL叢集會使用Tunnel Record API方式訪問並匯入MaxCompute資料。 若您需要通過Tunnel Arrow API方式訪問並匯入MaxCompute資料,請先開啟Arrow API功能。開啟後,AnalyticDB for MySQL叢集會使用Tunnel Arrow API方式進行匯入。

Tunnel Record API方式

資料匯入方式分為常規匯入(預設)和彈性匯入。常規匯入在計算節點中讀取來源資料,然後在儲存節點中構建索引,消耗計算資源和儲存資源。彈性匯入在Serverless Spark Job中讀取來源資料和構建索引,消耗Job型資源群組的資源。僅核心版本3.1.10.0及以上且已建立Job型資源群組的叢集支援彈性匯入資料。相較於常規匯入,彈性匯入可以大幅減少資源的消耗,降低匯入處理程序中對線上讀寫業務的影響,提升資源隔離性和資料匯入效率。更多內容,請參見資料匯入方式介紹

常規匯入

  1. 進入SQL編輯器。

    1. 登入雲原生資料倉儲AnalyticDB MySQL控制台,在左上方選擇叢集所在地區。在左側導覽列,單擊集群清單,然後單擊目的地組群ID。

    2. 在左側導覽列,單擊作業開發 > SQL開發

  2. 建立外部資料庫。樣本如下:

    CREATE EXTERNAL DATABASE adb_external_db;
  3. 建立外表。本文樣本為test_adb

    CREATE EXTERNAL TABLE IF NOT EXISTS adb_external_db.test_adb (
        id int,
        name varchar(1023),
        age int,
        dt string
        ) ENGINE='ODPS'
    TABLE_PROPERTIES='{
    "accessid":"yourAccessKeyID",
    "endpoint":"http://service.cn-hangzhou.maxcompute.aliyun.com/api",
    "accesskey":"yourAccessKeySecret",
    "partition_column":"dt",
    "project_name":"odps_project",
    "table_name":"odps_nopart_import_test"
    }';
    說明
    • AnalyticDB for MySQL外表和MaxCompute中表的欄位名稱、欄位數量、欄位順序需要一致,欄位類型需要相容。

    • 外表的參數說明,請參見CREATE EXTERNAL TABLE

  4. 查詢資料。

    SELECT * FROM adb_external_db.test_adb;

    返回結果如下:

    +------+-------+------+---------+
    | id   | name  | age  |   dt    |
    +------+-------+------+---------+
    |    1 | james |   10 |  202207 |
    |    2 | bond  |   20 |  202207 |
    |    3 | jack  |   30 |  202207 |
    |    4 | lucy  |   40 |  202207 |
    +------+-------+------+---------+
  5. 執行以下步驟將MaxCompute資料匯入至AnalyticDB for MySQL

    1. AnalyticDB for MySQL中建立資料庫,樣本如下:

      CREATE DATABASE adb_demo; 
    2. AnalyticDB for MySQL中建立表用於儲存從MaxCompute中匯入的資料,樣本如下:

      說明

      新表和步驟3中建立的外表的欄位順序和欄位數量需要一致,欄位類型相容。

      CREATE TABLE IF NOT EXISTS adb_demo.adb_import_test(
          id int,
          name string,
          age int,
          dt string,
          PRIMARY KEY(id,dt)
      )
      DISTRIBUTED BY HASH(id)  
      PARTITION BY VALUE('dt'); 
    3. 向表中寫入資料,樣本如下:

      • 方式一:執行INSERT INTO匯入資料,當主鍵重複時會自動忽略當前寫入資料,不做更新,作用等同於INSERT IGNORE INTO,詳情請參見INSERT INTO。樣本如下:

        INSERT INTO adb_demo.adb_import_test
        SELECT * FROM adb_external_db.test_adb;

        如果需要將特定分區的資料匯入adb_demo.adb_import_test,可以執行:

        INSERT INTO adb_demo.adb_import_test
        SELECT * FROM adb_external_db.test_adb 
        WHERE dt = '202207'; 
      • 方式二:執行INSERT OVERWRITE INTO匯入資料,會覆蓋表中原有的資料。樣本如下:

        INSERT OVERWRITE INTO adb_demo.adb_import_test
        SELECT * FROM adb_external_db.test_adb;
      • 方式三:非同步執行INSERT OVERWRITE INTO匯入資料。通常使用SUBMIT JOB提交非同步任務,由後台調度,可以在寫入任務前增加Hint(/*+ direct_batch_load=true*/)加速寫入任務。詳情請參見非同步寫入。樣本如下:

        SUBMIT job 
        INSERT OVERWRITE INTO adb_demo.adb_import_test
        SELECT * FROM adb_external_db.test_adb;

        返回結果如下:

        +---------------------------------------+
        | job_id                                |
        +---------------------------------------+
        | 2020112122202917203100908203303****** |
        +---------------------------------------+

        關於非同步提交任務詳情,請參見非同步提交匯入任務

彈性匯入

  1. 進入SQL編輯器。

    1. 登入雲原生資料倉儲AnalyticDB MySQL控制台,在左上方選擇叢集所在地區。在左側導覽列,單擊集群清單,然後單擊目的地組群ID。

    2. 在左側導覽列,單擊作業開發 > SQL開發

  2. 建立資料庫。如果有已建立的資料庫,可以忽略本步驟。樣本如下:

    CREATE DATABASE adb_demo; 
  3. 建立外表。

    說明
    • AnalyticDB for MySQL外表的名稱需要和MaxCompute專案的名稱相同,否則建立外表會失敗。

    • AnalyticDB for MySQL外表和MaxCompute中表的欄位名稱、欄位數量、欄位順序需要一致,欄位類型需要相容。

    • 彈性匯入僅支援CREATE TABLE語句建立外表。

    CREATE TABLE IF NOT EXISTS test_adb
    (
        id int,
        name string,
        age int,
        dt string
    )
     ENGINE='ODPS'
     TABLE_PROPERTIES='{
     "endpoint":"http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api",
     "accessid":"yourAccessKeyID",
     "accesskey":"yourAccessKeySecret",
     "partition_column":"dt",
     "project_name":"odps_project",
     "table_name":"odps_nopart_import_test"
     }';                 

    外表支援設定的參數及參數說明,請參見參數說明

  4. 查詢資料。

    SELECT * FROM adb_demo.test_adb;

    返回結果如下:

    +------+-------+------+---------+
    | id   | name  | age  |   dt    |
    +------+-------+------+---------+
    |    1 | james |   10 |  202207 |
    |    2 | bond  |   20 |  202207 |
    |    3 | jack  |   30 |  202207 |
    |    4 | lucy  |   40 |  202207 |
    +------+-------+------+---------+
  5. AnalyticDB for MySQL中建立表用於儲存從MaxCompute中匯入的資料。樣本如下:

    說明

    建立的內表和步驟3中建立的外表的欄位名稱、欄位數量、欄位順序、欄位類型必須相同。

    CREATE TABLE IF NOT EXISTS adb_import_test
    (   id int,
        name string,
        age int,
        dt string,
        PRIMARY KEY(id,dt)
    )
    DISTRIBUTED BY HASH(id)
    PARTITION BY VALUE('dt') LIFECYCLE 30;  
  6. 匯入資料。

    重要

    彈性匯入僅支援通過INSERT OVERWRITE INTO語句匯入資料。

    • 方法一:執行INSERT OVERWRITE INTO彈性匯入資料,會覆蓋表中原有的資料。樣本如下:

      /*+ elastic_load=true, elastic_load_configs=[adb.load.resource.group.name=resource_group]*/
      INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_demo.test_adb;
    • 方法二:非同步執行INSERT OVERWRITE INTO彈性匯入資料。通常使用SUBMIT JOB提交非同步任務,由後台調度。

      /*+ elastic_load=true, elastic_load_configs=[adb.load.resource.group.name=resource_group]*/
      SUBMIT JOB INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_demo.test_adb;
      重要

      非同步提交彈性匯入任務時,不支援設定優先權隊列。

      返回結果如下:

      +---------------------------------------+
      | job_id                                |
      +---------------------------------------+
      | 2023081517192220291720310090151****** |
      +---------------------------------------+

    使用SUBMIT JOB提交非同步任務後,返回結果僅表示非同步任務提交成功。您可以通過job_id終止非同步任務或查詢非同步任務狀態,判斷任務是否執行成功。具體操作,請參見非同步提交匯入任務

    Hint參數說明:

    • elastic_load:是否使用彈性匯入方式。取值:truefalse(預設值)。

    • elastic_load_configs:彈性匯入方式支援配置的參數。參數需使用方括弧([ ])括起來,且多個參數之間以豎線(|)分隔,支援配置的參數如下表所示:

      參數

      是否必填

      說明

      adb.load.resource.group.name

      執行彈性匯入任務的Job資源群組名稱。

      adb.load.job.max.acu

      單個彈性匯入任務最多使用的資源。單位為ACU,最小值為5 ACU。預設值為叢集Shard個數+1。

      執行如下語句可查詢叢集Shard個數:

      SELECT count(1) FROM information_schema.kepler_meta_shards;

      spark.driver.resourceSpec

      Spark driver的資源規格。預設值為small。取值範圍,請參見Spark應用配置參數說明的型號列。

      spark.executor.resourceSpec

      Spark executor的資源規格。預設值為large。取值範圍,請參見Spark應用配置參數說明的型號列。

      spark.adb.executorDiskSize

      Spark executor的磁碟容量,取值範圍為(0,100],單位為GiB,預設值為10 Gi。更多資訊,請參見指定Driver和Executor資源

  7. (可選)查看已提交的匯入任務是否為彈性匯入任務。

    SELECT job_name, (job_type = 3) AS is_elastic_load FROM INFORMATION_SCHEMA.kepler_meta_async_jobs WHERE job_name = "2023081818010602101701907303151******";

    返回結果如下:

    +---------------------------------------+------------------+
    | job_name                              | is_elastic_load  |
    +---------------------------------------+------------------+
    | 2023081517195203101701907203151****** |       1          |
    +---------------------------------------+------------------+

    is_elastic_load的傳回值為1,表示已提交的匯入任務是彈性匯入任務;若為0,則表示已提交的匯入任務是常規匯入任務。

Tunnel Arrow API方式

步驟一:開啟Arrow API

開啟方法

您可以通過SET命令或Hint在叢集層級和查詢層級開啟Arrow API:

  • 叢集層級開啟Arrow API:

    SET ADB_CONFIG <config_name>= <value>;
  • 查詢層級開啟Arrow API:

    /*<config_name>= <value>*/ SELECT * FROM table;

Arrow API相關配置參數

參數(config_name)

說明

ODPS_TUNNEL_ARROW_ENABLED

是否開啟Arrow API。取值:

  • true:是。

  • false(預設值):否。

ODPS_TUNNEL_SPLIT_BY_SIZE_ENABLED

是否開啟動態Split切分。取值:

  • true:是。

  • false(預設值):否。

步驟二:訪問並匯入MaxCompute資料

  1. 進入SQL編輯器。

    1. 登入雲原生資料倉儲AnalyticDB MySQL控制台,在左上方選擇叢集所在地區。在左側導覽列,單擊集群清單,然後單擊目的地組群ID。

    2. 在左側導覽列,單擊作業開發 > SQL開發

  2. 建立外部資料庫。樣本如下:

    CREATE EXTERNAL DATABASE adb_external_db;
  3. 建立外表。本文樣本為test_adb

    CREATE EXTERNAL TABLE IF NOT EXISTS adb_external_db.test_adb (
        id int,
        name varchar(1023),
        age int,
        dt string
        ) ENGINE='ODPS'
    TABLE_PROPERTIES='{
    "accessid":"yourAccessKeyID",
    "endpoint":"http://service.cn-hangzhou.maxcompute.aliyun.com/api",
    "accesskey":"yourAccessKeySecret",
    "partition_column":"dt",
    "project_name":"odps_project",
    "table_name":"odps_nopart_import_test"
    }';
    說明
    • AnalyticDB for MySQL外表和MaxCompute中表的欄位名稱、欄位數量、欄位順序需要一致,欄位類型需要相容。

    • 外表的參數說明,請參見CREATE EXTERNAL TABLE

  4. 查詢資料。

    SELECT * FROM adb_external_db.test_adb;

    返回結果如下:

    +------+-------+------+---------+
    | id   | name  | age  |   dt    |
    +------+-------+------+---------+
    |    1 | james |   10 |  202207 |
    |    2 | bond  |   20 |  202207 |
    |    3 | jack  |   30 |  202207 |
    |    4 | lucy  |   40 |  202207 |
    +------+-------+------+---------+
  5. 執行以下步驟將MaxCompute資料匯入至AnalyticDB for MySQL

    1. AnalyticDB for MySQL中建立資料庫,樣本如下:

      CREATE DATABASE adb_demo; 
    2. AnalyticDB for MySQL中建立表用於儲存從MaxCompute中匯入的資料,樣本如下:

      說明

      新表和步驟3中建立的外表的欄位順序和欄位數量需要一致,欄位類型相容。

      CREATE TABLE IF NOT EXISTS adb_demo.adb_import_test(
          id int,
          name string,
          age int,
          dt string
          PRIMARY KEY(id,dt)
      )
      DISTRIBUTED BY HASH(id)  
      PARTITION BY VALUE('dt'); 
    3. 向表中寫入資料,樣本如下:

      • 方式一:執行INSERT INTO匯入資料,當主鍵重複時會自動忽略當前寫入資料,不做更新,作用等同於INSERT IGNORE INTO,詳情請參見INSERT INTO。樣本如下:

        INSERT INTO adb_demo.adb_import_test
        SELECT * FROM adb_external_db.test_adb;

        如果需要將特定分區的資料匯入adb_demo.adb_import_test,可以執行:

        INSERT INTO adb_demo.adb_import_test
        SELECT * FROM adb_external_db.test_adb 
        WHERE dt = '202207'; 
      • 方式二:執行INSERT OVERWRITE INTO匯入資料,會覆蓋表中原有的資料。樣本如下:

        INSERT OVERWRITE INTO adb_demo.adb_import_test
        SELECT * FROM adb_external_db.test_adb;
      • 方式三:非同步執行INSERT OVERWRITE INTO匯入資料。通常使用SUBMIT JOB提交非同步任務,由後台調度,可以在寫入任務前增加Hint(/*+ direct_batch_load=true*/)加速寫入任務。詳情請參見非同步寫入。樣本如下:

        SUBMIT job 
        INSERT OVERWRITE INTO adb_demo.adb_import_test
        SELECT * FROM adb_external_db.test_adb;

        返回結果如下:

        +---------------------------------------+
        | job_id                                |
        +---------------------------------------+
        | 2020112122202917203100908203303****** |
        +---------------------------------------+

        關於非同步提交任務詳情,請參見非同步提交匯入任務

數倉版

  1. 串連目標AnalyticDB for MySQL叢集。詳細操作步驟,請參見串連叢集

  2. 建立目標資料庫。

    CREATE database test_adb;
  3. 建立MaxCompute外表。本文以odps_nopart_import_test_external_table為例。

    CREATE TABLE IF NOT EXISTS odps_nopart_import_test_external_table
    (
        id int,
        name string,
        age int,
        dt string
    )
     ENGINE='ODPS'
     TABLE_PROPERTIES='{
     "endpoint":"http://service.cn.maxcompute.aliyun-inc.com/api",
     "accessid":"yourAccessKeyID",
     "accesskey":"yourAccessKeySecret",
     "partition_column":"dt",
     "project_name":"odps_project1",
     "table_name":"odps_nopart_import_test"
     }';                 

    參數

    說明

    ENGINE=’ODPS’

    外表的儲存引擎。讀寫MaxCompute資料時,取值為ODPS。

    endpoint

    MaxCompute的EndPoint(網域名稱節點)

    說明

    目前僅支援AnalyticDB for MySQL通過MaxCompute的VPC網路Endpoint訪問MaxCompute。

    查詢各地區VPC網路的Endpoint,請參見VPC Endpoint

    accessid

    阿里雲帳號或者具備MaxCompute存取權限的RAM使用者的AccessKey ID。

    如何擷取AccessKey ID和AccessKey Secret,請參見帳號與許可權

    accesskey

    阿里雲帳號或者具備MaxCompute存取權限的RAM使用者的AccessKey Secret。

    如何擷取AccessKey ID和AccessKey Secret,請參見帳號與許可權

    partition_column

    本文使用的樣本是建立分區表的樣本,所以需要配置partition_column。如果MaxCompute的表是非分區表,那麼AnalyticDB for MySQL中也需要建立非分區表,此時無需配置partition_column

    project_name

    MaxCompute中的工作空間名稱。

    table_name

    MaxCompute中的資料來源表名。

  4. test_adb資料庫中建立表adb_nopart_import_test,用於儲存從MaxCompute中匯入的資料。

    CREATE TABLE IF NOT EXISTS adb_nopart_import_test
    (   id int,
        name string,
        age int,
        dt string,
        PRIMARY KEY(id,dt)
    )
    DISTRIBUTED BY HASH(id)
    PARTITION BY VALUE('dt') LIFECYCLE 30;
  5. 匯入資料。

    • 方式一:執行INSERT INTO匯入資料,當主鍵重複時會自動忽略當前寫入資料,不做更新,作用等同於INSERT IGNORE INTO,詳情請參見INSERT INTO。樣本如下:

      INSERT INTO adb_nopart_import_test
      SELECT * FROM odps_nopart_import_test_external_table; 

      通過SELECT查詢寫入表中的資料,樣本如下:

      SELECT * FROM adb_nopart_import_test;

      返回結果如下:

      +------+-------+------+---------+
      | id   | name  | age  |   dt    |
      +------+-------+------+---------+
      |    1 | james |   10 |  202207 |
      |    2 | bond  |   20 |  202207 |
      |    3 | jack  |   30 |  202207 |
      |    4 | lucy  |   40 |  202207 |
      +------+-------+------+---------+

      如果需要將特定分區的資料匯入adb_nopart_import_test,可以執行:

      INSERT INTO adb_nopart_import_test
      SELECT * FROM odps_nopart_import_test_external_table
      WHERE dt = '202207';
    • 方式二:執行INSERT OVERWRITE匯入資料,會覆蓋表中原有的資料。樣本如下:

      INSERT OVERWRITE adb_nopart_import_test
      SELECT * FROM odps_nopart_import_test_external_table;
    • 方式三:非同步執行INSERT OVERWRITE匯入資料。通常使用SUBMIT JOB提交非同步任務,由後台調度,可以在寫入任務前增加Hint加速寫入任務。詳情請參見非同步寫入。樣本如下:

      SUBMIT JOB 
      INSERT OVERWRITE adb_nopart_import_test 
      SELECT * FROM odps_nopart_import_test_external_table;  

      返回結果如下:

      +---------------------------------------+
      | job_id                                |
      +---------------------------------------+
      | 2020112122202917203100908203303****** |

      關於非同步提交任務詳情請參見非同步提交匯入任務