AnalyticDB for MySQL では、外部テーブルを使用して MaxCompute からデータを読み取り、インポートできます。この方法により、クラスターリソースの使用率が最大化され、データインポートのパフォーマンスが向上します。このトピックでは、外部テーブルを使用して MaxCompute データを AnalyticDB for MySQL にインポートする方法について説明します。
機能
AnalyticDB for MySQL 製品シリーズ | アクセス方法 | AnalyticDB for MySQL カーネルバージョン | データアクセス効率 |
Enterprise Edition、Basic Edition、および Data Lakehouse Edition | Tunnel Record API | 制限なし | 小規模なデータアクセスに適しています。データアクセスとインポートは低速です。 |
Tunnel Arrow API | 3.2.2.3 以降 | 列単位でデータを読み取ることで、データアクセスとインポート時間を短縮し、より高速なデータ転送を実現します。 | |
データウェアハウス版 | Tunnel Record API | 制限なし | パブリックな Data Transmission Service リソースグループを使用します。このリソースはリージョン内のすべてのプロジェクトで共有されるため、データアクセスとインポートが低速になります。 |
前提条件
MaxCompute プロジェクトと AnalyticDB for MySQL クラスターは同じリージョンに存在します。詳細については、「クラスターの作成」をご参照ください。
AnalyticDB for MySQL クラスターの VPC CIDR ブロックが MaxCompute プロジェクトのホワイトリストに追加されていること。
説明AnalyticDB for MySQL コンソールにログインし、[クラスター情報] ページで VPC ID を見つけます。VPC コンソールにログインし、VPC ID を使用して [VPC] ページで CIDR ブロックを見つけます。MaxCompute ホワイトリストの設定に関する詳細については、「IP ホワイトリストの管理」をご参照ください。
AnalyticDB for MySQLEnterprise Edition、Basic Edition、および Data Lakehouse Edition クラスターの場合:
Elastic Network Interface (ENI) アクセスが有効になっていること。
重要AnalyticDB for MySQL コンソールにログインします。左側のナビゲーションウィンドウで、 を選択します。[ネットワーク情報] セクションで、ENI アクセスのスイッチをオンにします。
ENI アクセスを有効または無効にすると、約 2 分間データベース接続が中断されます。この期間中は、データの読み取りや書き込みはできません。この操作を実行する前に、潜在的な影響を慎重に評価することをお勧めします。
Tunnel Arrow API を使用して MaxCompute データにアクセスしてインポートするには、クラスターが V3.2.2.3 以降である必要があります。
説明AnalyticDB for MySQL クラスターのマイナーバージョンを表示および更新するには、AnalyticDB for MySQL コンソールにログインし、クラスター情報 ページの 構成情報 セクションに移動します。
データ準備
このセクションでは、MaxCompute プロジェクト odps_project とサンプルテーブル odps_nopart_import_test を例として使用します:
CREATE TABLE IF NOT EXISTS odps_nopart_import_test (
id int,
name string,
age int)
partitioned by (dt string);次の文を実行して、odps_nopart_import_test テーブルにパーティションを追加します:
ALTER TABLE odps_nopart_import_test
ADD
PARTITION (dt='202207');次のサンプル文を実行して、パーティションにデータを追加します:
INSERT INTO odps_project.odps_nopart_import_test
PARTITION (dt='202207')
VALUES (1,'james',10),(2,'bond',20),(3,'jack',30),(4,'lucy',40);手順
Enterprise Edition、Basic Edition、および Data Lakehouse Edition
デフォルトでは、AnalyticDB for MySQL クラスターは Tunnel Record API を使用して MaxCompute からデータにアクセスし、インポートします。Tunnel Arrow API を使用するには、まず Arrow API 機能を有効にする必要があります。この機能が有効になると、AnalyticDB for MySQL クラスターは Tunnel Arrow API を使用してデータをインポートします。
Tunnel Record API
データインポート方法は、通常のインポート (デフォルト) とエラスティックインポートの 2 種類があります。通常のインポート方法では、計算ノードからソースデータを読み取り、ストレージノードにインデックスを構築するため、コンピューティングリソースとストレージリソースの両方を消費します。エラスティックインポート方法では、Serverless Spark ジョブ内でソースデータを読み取り、インデックスを構築するため、ジョブリソースグループのリソースを消費します。エラスティックインポート方法は、Milvus バージョン 3.1.10.0 以降を実行し、ジョブリソースグループを持つクラスターでのみサポートされます。通常のインポート方法と比較して、エラスティックインポート方法は消費するリソースが少なくなります。これにより、リアルタイムのデータ読み取りおよび書き込みへの影響が軽減され、リソースの隔離とデータインポートの効率が向上します。詳細については、「データインポート方法」をご参照ください。
通常のインポート
SQL エディターに移動します。
AnalyticDB for MySQL コンソールにログインします。コンソールの左上隅でリージョンを選択します。左側のナビゲーションウィンドウで、クラスターリスト をクリックします。管理するクラスターを見つけて、クラスター ID をクリックします。
左側のナビゲーションウィンドウで、 を選択します。
外部データベースを作成します。次の文は一例です:
CREATE EXTERNAL DATABASE adb_external_db;外部テーブルを作成します。このトピックでは、
test_adbを例として使用します。CREATE EXTERNAL TABLE IF NOT EXISTS adb_external_db.test_adb ( id int, name varchar(1023), age int, dt string ) ENGINE='ODPS' TABLE_PROPERTIES='{ "accessid":"yourAccessKeyID", "endpoint":"http://service.cn-hangzhou.maxcompute.aliyun.com/api", "accesskey":"yourAccessKeySecret", "partition_column":"dt", "project_name":"odps_project", "table_name":"odps_nopart_import_test" }';説明AnalyticDB for MySQL の外部テーブルと MaxCompute のテーブルは、同じフィールド名、フィールド数、およびフィールド順序を持つ必要があります。フィールドのデータの型は互換性がある必要があります。
外部テーブルのパラメーターの詳細については、「CREATE EXTERNAL TABLE」をご参照ください。
データをクエリします。
SELECT * FROM adb_external_db.test_adb;次の結果が返されます:
+------+-------+------+---------+ | id | name | age | dt | +------+-------+------+---------+ | 1 | james | 10 | 202207 | | 2 | bond | 20 | 202207 | | 3 | jack | 30 | 202207 | | 4 | lucy | 40 | 202207 | +------+-------+------+---------+次のステップを実行して、MaxCompute から AnalyticDB for MySQL にデータをインポートします。
AnalyticDB for MySQL にデータベースを作成します。次の文は一例です:
CREATE DATABASE adb_demo;AnalyticDB for MySQL にテーブルを作成して、MaxCompute からインポートされたデータを格納します。次の文は一例です:
説明新しいテーブルとステップ 3 で作成した外部テーブルは、同じフィールド順序とフィールド数を持つ必要があります。フィールドのデータの型は互換性がある必要があります。
CREATE TABLE IF NOT EXISTS adb_demo.adb_import_test( id int, name string, age int, dt string, PRIMARY KEY(id,dt) ) DISTRIBUTED BY HASH(id) PARTITION BY VALUE('dt');テーブルにデータを書き込みます。次のサンプル文を使用します:
方法 1: `INSERT INTO` 文を実行してデータをインポートします。プライマリキーが重複している場合、データは自動的に無視され、更新されません。この操作は
INSERT IGNORE INTOと同等です。詳細については、「INSERT INTO」をご参照ください。次の文は一例です:INSERT INTO adb_demo.adb_import_test SELECT * FROM adb_external_db.test_adb;adb_demo.adb_import_testに特定のパーティションからデータをインポートするには、次の文を実行します:INSERT INTO adb_demo.adb_import_test SELECT * FROM adb_external_db.test_adb WHERE dt = '202207';方法 2: `INSERT OVERWRITE INTO` 文を実行してデータをインポートします。この文は、テーブル内の既存のデータを上書きします。次の文は一例です:
INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_external_db.test_adb;方法 3: `INSERT OVERWRITE INTO` 文を非同期で実行してデータをインポートします。
SUBMIT JOBコマンドを使用して非同期タスクを送信できます。タスクはバックエンドによってスケジュールされます。書き込みタスクの前にヒント (/*+ direct_batch_load=true*/) を追加して、タスクを高速化できます。詳細については、「非同期書き込み」をご参照ください。次の文は一例です:SUBMIT job INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_external_db.test_adb;次の結果が返されます:
+---------------------------------------+ | job_id | +---------------------------------------+ | 2020112122202917203100908203303****** | +---------------------------------------+詳細については、「非同期でのインポートタスクの送信」をご参照ください。
エラスティックインポート
SQL エディターに移動します。
AnalyticDB for MySQL コンソールにログインします。コンソールの左上隅でリージョンを選択します。左側のナビゲーションウィンドウで、クラスターリスト をクリックします。管理するクラスターを見つけて、クラスター ID をクリックします。
左側のナビゲーションウィンドウで、 を選択します。
データベースを作成します。データベースが既に存在する場合は、このステップをスキップできます。次の文は一例です:
CREATE DATABASE adb_demo;外部テーブルを作成します。
説明AnalyticDB for MySQL の外部テーブルの名前は、MaxCompute プロジェクトの名前と同じである必要があります。そうでない場合、外部テーブルの作成は失敗します。
AnalyticDB for MySQL の外部テーブルと MaxCompute のテーブルは、同じフィールド名、フィールド数、およびフィールド順序を持つ必要があります。フィールドのデータの型は互換性がある必要があります。
エラスティックインポートは、
CREATE TABLE文を使用した外部テーブルの作成のみをサポートします。
CREATE TABLE IF NOT EXISTS test_adb ( id int, name string, age int, dt string ) ENGINE='ODPS' TABLE_PROPERTIES='{ "endpoint":"http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api", "accessid":"yourAccessKeyID", "accesskey":"yourAccessKeySecret", "partition_column":"dt", "project_name":"odps_project", "table_name":"odps_nopart_import_test" }';外部テーブルに設定できるパラメーターの詳細については、「パラメーターの説明」をご参照ください。
データをクエリします。
SELECT * FROM adb_demo.test_adb;次の結果が返されます:
+------+-------+------+---------+ | id | name | age | dt | +------+-------+------+---------+ | 1 | james | 10 | 202207 | | 2 | bond | 20 | 202207 | | 3 | jack | 30 | 202207 | | 4 | lucy | 40 | 202207 | +------+-------+------+---------+AnalyticDB for MySQL にテーブルを作成して、MaxCompute からインポートされたデータを格納します。次の文は一例です:
説明内部テーブルとステップ 3 で作成した外部テーブルは、同じフィールド名、フィールド数、フィールド順序、およびデータの型を持つ必要があります。
CREATE TABLE IF NOT EXISTS adb_import_test ( id int, name string, age int, dt string, PRIMARY KEY(id,dt) ) DISTRIBUTED BY HASH(id) PARTITION BY VALUE('dt') LIFECYCLE 30;データをインポートします。
重要エラスティックインポートは、
INSERT OVERWRITE INTO文を使用したデータインポートのみをサポートします。方法 1: INSERT OVERWRITE INTO 文を実行して、データをエラスティックにインポートします。この文は、テーブル内の既存のデータを上書きします。次の文は一例です:
/*+ elastic_load=true, elastic_load_configs=[adb.load.resource.group.name=resource_group]*/ INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_demo.test_adb;
方法 2: INSERT OVERWRITE INTO 文を非同期で実行して、データをエラスティックにインポートします。
SUBMIT JOBコマンドを使用して非同期タスクを送信できます。タスクはバックエンドによってスケジュールされます。/*+ elastic_load=true, elastic_load_configs=[adb.load.resource.group.name=resource_group]*/ SUBMIT JOB INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_demo.test_adb;重要エラスティックインポートタスクを非同期で送信する場合、優先度付きキューを設定することはできません。
次の結果が返されます:
+---------------------------------------+ | job_id | +---------------------------------------+ | 2023081517192220291720310090151****** | +---------------------------------------+
SUBMIT JOBコマンドを使用して非同期ジョブを送信した後、返された結果はジョブが正常に送信されたことのみを示します。返された job_id を使用して、非同期ジョブを終了したり、そのステータスをクエリしてジョブが正常に実行されたかどうかを確認したりできます。詳細については、「非同期でのインポートタスクの送信」をご参照ください。ヒントパラメーター:
elastic_load: エラスティックインポートを使用するかどうかを指定します。有効な値: true および false。デフォルト値: false。
elastic_load_configs: エラスティックインポート機能の設定パラメーター。パラメーターを角括弧 ([ ]) で囲み、複数のパラメーターを縦棒 (|) で区切る必要があります。次の表にパラメーターを示します。
パラメーター
必須
説明
adb.load.resource.group.name
はい
エラスティックインポートジョブを実行するジョブリソースグループの名前。
adb.load.job.max.acu
いいえ
エラスティックインポートジョブの最大リソース量。単位: AnalyticDB 計算ユニット (ACU)。最小値: 5 ACU。デフォルト値: シャード数 + 1。
次の文を実行して、クラスター内のシャード数をクエリします:
SELECT count(1) FROM information_schema.kepler_meta_shards;spark.driver.resourceSpec
いいえ
Spark ドライバーのリソースタイプ。 デフォルト値: small。有効な値については、Conf 設定パラメーターのトピックの「Spark アプリケーション設定パラメーター」の表の Type 列をご参照ください。
spark.executor.resourceSpec
いいえ
Spark エグゼキュータのリソースタイプ。 デフォルト値: large。有効な値については、Conf 設定パラメーターのトピックの「Spark アプリケーション設定パラメーター」の表の Type 列をご参照ください。
spark.adb.executorDiskSize
いいえ
Spark エグゼキュータのディスク容量。有効な値: (0,100]。単位: GiB。デフォルト値: 10 GiB。詳細については、Conf 設定パラメーターのトピックの「ドライバーとエグゼキュータのリソースを指定する」セクションをご参照ください。
(オプション) 送信されたインポートタスクがエラスティックインポートタスクであるかどうかを確認します。
SELECT job_name, (job_type = 3) AS is_elastic_load FROM INFORMATION_SCHEMA.kepler_meta_async_jobs WHERE job_name = "2023081818010602101701907303151******";結果は次のようになります:
+---------------------------------------+------------------+ | job_name | is_elastic_load | +---------------------------------------+------------------+ | 2023081517195203101701907203151****** | 1 | +---------------------------------------+------------------+is_elastic_loadの戻り値は、エラスティックインポートタスクの場合は 1、通常のインポートタスクの場合は 0 です。
Tunnel Arrow API
ステップ 1: Arrow API の有効化
方法
Arrow API は、`SET` コマンドを使用してクラスターレベルで有効にするか、ヒントを使用してクエリレベルで有効にすることができます:
クラスターレベルで Arrow API を有効にする:
SET ADB_CONFIG <config_name>= <value>;クエリレベルで Arrow API を有効にする:
/*<config_name>= <value>*/ SELECT * FROM table;
Arrow API 設定パラメーター
パラメーター (config_name) | 説明 |
ODPS_TUNNEL_ARROW_ENABLED | Arrow API を有効にするかどうかを指定します。有効な値:
|
ODPS_TUNNEL_SPLIT_BY_SIZE_ENABLED | 動的分割を有効にするかどうかを指定します。有効な値:
|
ステップ 2: MaxCompute データへのアクセスとインポート
SQL エディターに移動します。
AnalyticDB for MySQL コンソールにログインします。コンソールの左上隅でリージョンを選択します。左側のナビゲーションウィンドウで、クラスターリスト をクリックします。管理するクラスターを見つけて、クラスター ID をクリックします。
左側のナビゲーションウィンドウで、 を選択します。
外部データベースを作成します。次の文は一例です:
CREATE EXTERNAL DATABASE adb_external_db;外部テーブルを作成します。このトピックでは、
test_adbを例として使用します。CREATE EXTERNAL TABLE IF NOT EXISTS adb_external_db.test_adb ( id int, name varchar(1023), age int, dt string ) ENGINE='ODPS' TABLE_PROPERTIES='{ "accessid":"yourAccessKeyID", "endpoint":"http://service.cn-hangzhou.maxcompute.aliyun.com/api", "accesskey":"yourAccessKeySecret", "partition_column":"dt", "project_name":"odps_project", "table_name":"odps_nopart_import_test" }';説明AnalyticDB for MySQL の外部テーブルと MaxCompute のテーブルは、同じフィールド名、フィールド数、およびフィールド順序を持つ必要があります。フィールドのデータの型は互換性がある必要があります。
外部テーブルのパラメーターの詳細については、「CREATE EXTERNAL TABLE」をご参照ください。
データをクエリします。
SELECT * FROM adb_external_db.test_adb;次の結果が返されます:
+------+-------+------+---------+ | id | name | age | dt | +------+-------+------+---------+ | 1 | james | 10 | 202207 | | 2 | bond | 20 | 202207 | | 3 | jack | 30 | 202207 | | 4 | lucy | 40 | 202207 | +------+-------+------+---------+次のステップを実行して、MaxCompute から AnalyticDB for MySQL にデータをインポートします。
AnalyticDB for MySQL にデータベースを作成します。次の文は一例です:
CREATE DATABASE adb_demo;AnalyticDB for MySQL にテーブルを作成して、MaxCompute からインポートされたデータを格納します。次の文は一例です:
説明新しいテーブルとステップ 3 で作成した外部テーブルは、同じフィールド順序とフィールド数を持つ必要があります。フィールドのデータの型は互換性がある必要があります。
CREATE TABLE IF NOT EXISTS adb_demo.adb_import_test( id int, name string, age int, dt string PRIMARY KEY(id,dt) ) DISTRIBUTED BY HASH(id) PARTITION BY VALUE('dt');テーブルにデータを書き込みます。次のサンプル文を使用します:
方法 1: `INSERT INTO` 文を実行してデータをインポートします。この操作は
INSERT IGNORE INTOと同等です。プライマリキーが重複している場合、新しいデータは無視され、既存のレコードは更新されません。詳細については、「INSERT INTO」をご参照ください。次の文は一例です:INSERT INTO adb_demo.adb_import_test SELECT * FROM adb_external_db.test_adb;adb_demo.adb_import_testに特定のパーティションからデータをインポートするには、次の文を実行します:INSERT INTO adb_demo.adb_import_test SELECT * FROM adb_external_db.test_adb WHERE dt = '202207';方法 2: `INSERT OVERWRITE INTO` 文を実行してデータをインポートします。この文は、テーブル内の既存のデータを上書きします。次の文は一例です:
INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_external_db.test_adb;方法 3: `INSERT OVERWRITE INTO` 文を使用して非同期でデータをインポートします。
SUBMIT JOBコマンドを使用して非同期タスクを送信できます。タスクはバックエンドによってスケジュールされます。書き込みタスクの前にヒント (/*+ direct_batch_load=true*/) を追加して、タスクを高速化できます。詳細については、「非同期書き込み」をご参照ください。次の文は一例です:SUBMIT job INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_external_db.test_adb;次の結果が返されます:
+---------------------------------------+ | job_id | +---------------------------------------+ | 2020112122202917203100908203303****** | +---------------------------------------+詳細については、「非同期でのインポートタスクの送信」をご参照ください。
データウェアハウス版
AnalyticDB for MySQL クラスターに接続します。詳細については、「AnalyticDB for MySQL クラスターへの接続」をご参照ください。
ターゲットデータベースを作成します。
CREATE database test_adb;MaxCompute 外部テーブルを作成します。このトピックでは、
odps_nopart_import_test_external_tableを例として使用します。CREATE TABLE IF NOT EXISTS odps_nopart_import_test_external_table ( id int, name string, age int, dt string ) ENGINE='ODPS' TABLE_PROPERTIES='{ "endpoint":"http://service.cn.maxcompute.aliyun-inc.com/api", "accessid":"yourAccessKeyID", "accesskey":"yourAccessKeySecret", "partition_column":"dt", "project_name":"odps_project1", "table_name":"odps_nopart_import_test" }';パラメーター
説明
ENGINE=’ODPS’外部テーブルのストレージエンジン。このパラメーターを ODPS に設定して、MaxCompute からデータを読み取るか、MaxCompute にデータを書き込みます。
endpointMaxCompute の [エンドポイント (ドメイン名)]。
説明AnalyticDB for MySQL から MaxCompute には、MaxCompute VPC エンドポイント経由でのみアクセスできます。
さまざまなリージョンの VPC エンドポイントをクエリするには、「VPC エンドポイント」をご参照ください。
accessidMaxCompute にアクセスする権限を持つ Alibaba Cloud アカウントまたは Resource Access Management (RAM) ユーザーの AccessKey ID。
AccessKey ID と AccessKey シークレットの取得方法の詳細については、「アカウントと権限」をご参照ください。
accesskeyMaxCompute にアクセスする権限を持つ Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey シークレット。
AccessKey ID と AccessKey シークレットの取得方法の詳細については、「アカウントと権限」をご参照ください。
partition_columnこの例では、パーティションテーブルを作成し、
partition_columnパラメーターが必要です。MaxCompute のソーステーブルがパーティション化されていない場合は、AnalyticDB for MySQL に非パーティションテーブルを作成し、partition_columnパラメーターを省略します。project_nameMaxCompute のワークスペースの名前。
table_nameMaxCompute のソーステーブルの名前。
test_adbデータベースに、MaxCompute からインポートされたデータを格納するためのadb_nopart_import_testテーブルを作成します。CREATE TABLE IF NOT EXISTS adb_nopart_import_test ( id int, name string, age int, dt string, PRIMARY KEY(id,dt) ) DISTRIBUTED BY HASH(id) PARTITION BY VALUE('dt') LIFECYCLE 30;データをインポートします。
方法 1:
INSERT INTO文を実行してデータをインポートします。プライマリキーの競合が発生した場合、システムは現在のデータを無視し、更新を実行しません。この操作はINSERT IGNORE INTOと同等です。詳細については、「INSERT INTO」をご参照ください。次の文は一例です:INSERT INTO adb_nopart_import_test SELECT * FROM odps_nopart_import_test_external_table;次の例は、SELECT 文を使用してテーブルに書き込まれたデータをクエリする方法を示しています:
SELECT * FROM adb_nopart_import_test;次の結果が返されます:
+------+-------+------+---------+ | id | name | age | dt | +------+-------+------+---------+ | 1 | james | 10 | 202207 | | 2 | bond | 20 | 202207 | | 3 | jack | 30 | 202207 | | 4 | lucy | 40 | 202207 | +------+-------+------+---------+特定のパーティションから
adb_nopart_import_testにデータをインポートするには、次のコマンドを実行します:INSERT INTO adb_nopart_import_test SELECT * FROM odps_nopart_import_test_external_table WHERE dt = '202207';方法 2:
INSERT OVERWRITE文を実行してデータをインポートします。この操作は、テーブル内の既存のデータを上書きします。次の文は一例です:INSERT OVERWRITE adb_nopart_import_test SELECT * FROM odps_nopart_import_test_external_table;方法 3:
INSERT OVERWRITE文を非同期で実行してデータをインポートします。SUBMIT JOBコマンドを使用して、バックグラウンドでスケジュールされる非同期タスクを送信できます。書き込みタスクの前にヒントを追加して高速化できます。詳細については、「非同期書き込み」をご参照ください。次の文は一例です:SUBMIT JOB INSERT OVERWRITE adb_nopart_import_test SELECT * FROM odps_nopart_import_test_external_table;次の結果が返されます:
+---------------------------------------+ | job_id | +---------------------------------------+ | 2020112122202917203100908203303****** |非同期インポートタスクの送信方法の詳細については、「非同期インポートタスクの送信」をご参照ください。