すべてのプロダクト
Search
ドキュメントセンター

ApsaraDB for SelectDB:MySQL データソースからのデータ移行

最終更新日:Mar 01, 2025

Data Transmission Service (DTS)、DataWorks、Flink Change Data Capture (CDC)、またはカタログを使用して、MySQL データソースから ApsaraDB for SelectDB インスタンスにデータを移行できます。 MySQL データソースは、自己管理 MySQL データベース、ApsaraDB RDS for MySQL インスタンス、または PolarDB for MySQL クラスタです。 移行するデータ量とビジネスシナリオに基づいて、適切なソリューションを選択してデータを移行できます。 このトピックでは、MySQL データソースから ApsaraDB for SelectDB インスタンスにデータを移行するために使用されるソリューションについて説明し、比較します。

移行ソリューションの比較

DTS、DataWorks、Flink CDC、またはカタログを使用して、MySQL データソースから ApsaraDB for SelectDB インスタンスにデータを移行できます。 移行できるデータは、移行ソリューションによって異なります。 ビジネス要件に基づいて適切なソリューションを選択できます。

ソリューション

既存データの移行

増分データ同期

スキーマ移行

データベース移行

DDL 操作の増分同期

データ検証

DTS

✔️

✔️

✔️

✔️

✔️

✔️

DataWorks

✔️

✔️

✔️

✔️

✔️

Flink CDC

✔️

✔️

✔️

✔️

✔️

カタログ

✔️

以下のセクションでは、これらのソリューションを使用してデータを移行する方法について説明します。 詳細については、関連するリファレンスをご参照ください。

前提条件

  • MySQL データベースは SelectDB インスタンスに接続されています。

DTS を使用してデータを移行する

DTS を使用すると、既存データを移行し、MySQL データソースから ApsaraDB for SelectDB インスタンスに増分データを同期できます。 DTS は、スキーマ移行、DDL 操作同期、およびデータ検証もサポートしています。 この例では、ApsaraDB RDS for MySQL インスタンスを使用して、MySQL データソースから ApsaraDB for SelectDB インスタンスにデータを移行する方法について説明します。 詳細については、「DTS を使用してデータをインポートする」をご参照ください。

手順

  1. ApsaraDB for SelectDB コンソール にログインします。

  2. 上部のナビゲーションバーで、管理するインスタンスが存在するリージョンを選択します。

  3. 左側のナビゲーションウィンドウで、[インスタンス] をクリックします。 [インスタンス] ページで、インスタンスを見つけて ID をクリックし、[インスタンスの詳細] ページに移動します。

  4. インスタンスの詳細ページの左側のナビゲーションウィンドウで、[データパイプライン] をクリックします。表示されるページで、[データ同期] タブをクリックします。

    説明

    ほとんどの場合、DTS でのデータ同期には、既存データの移行と増分データのリアルタイム同期が含まれます。 DTS でのデータ移行は、一般的に既存の履歴データを移行するために使用されます。

  5. [データ同期タスクの作成] をクリックします。[データ同期タスクの作成] ページで、ソース データベースとターゲット データベースに関する情報を構成します。

  6. セクション

    パラメーター

    説明

    タスク情報

    タスク名

    DTS タスクの名前。 DTS は自動的にタスク名を生成します。 タスクを簡単に識別できる説明的な名前を指定することをお勧めします。 一意のタスク名を指定する必要はありません。

    ソースデータベース

    データベースタイプ

    ソースデータベースの種類。 MySQL を選択します。

    アクセス方法

    ソースデータベースのアクセス方法。 Alibaba Cloud インスタンス を選択します。

    インスタンスのリージョン

    ソース ApsaraDB RDS for MySQL インスタンスが存在するリージョン。

    RDS インスタンス ID

    ソース ApsaraDB RDS for MySQL インスタンスの ID。 例: rm-2z3m****。

    データベースアカウント

    ソース ApsaraDB RDS for MySQL インスタンスのデータベースアカウント。 アカウントに必要な権限の詳細については、「ApsaraDB RDS for MySQL インスタンスから ApsaraDB for SelectDB インスタンスにデータを同期する」トピックの「データベースアカウントに必要な権限」セクションをご参照ください。

    データベースのパスワード

    データベースへのアクセスに使用されるパスワード。

    暗号化

    データベースへの接続を暗号化するかどうかを指定します。 ビジネス要件に基づいて、[非暗号化] または [SSL 暗号化] を選択できます。 このパラメーターを [SSL 暗号化] に設定する場合は、DTS タスクを設定する前に、ApsaraDB RDS for MySQL インスタンスの SSL 暗号化を有効にする必要があります。 詳細については、「クラウド証明書を使用して SSL 暗号化を有効にする」をご参照ください。

    宛先データベース

    データベースタイプ

    宛先データベースの種類。 [selectdb] を選択します。

    アクセス方法

    宛先データベースのアクセス方法。 Alibaba Cloud インスタンス を選択します。

    インスタンスのリージョン

    宛先 ApsaraDB for SelectDB インスタンスが存在するリージョン。

    インスタンス ID

    宛先 ApsaraDB for SelectDB インスタンスの ID。

    データベースアカウント

    宛先 ApsaraDB for SelectDB インスタンスのデータベースアカウント。 アカウントに必要な権限の詳細については、「ApsaraDB RDS for MySQL インスタンスから ApsaraDB for SelectDB インスタンスにデータを同期する」トピックの「データベースアカウントに必要な権限」セクションをご参照ください。

    データベースのパスワード

    データベースへのアクセスに使用されるパスワード。

  7. [接続テストと続行] をクリックします。

  8. 移行するオブジェクトと詳細設定を構成します。

  9. パラメーター

    説明

    同期タイプ

    同期タイプ。 デフォルトでは、[増分データ同期] が選択されています。 [スキーマ同期][完全データ同期] も選択する必要があります。 事前チェックが完了すると、DTS は選択したオブジェクトの既存データをソースデータベースから宛先クラスタに同期します。 既存データは、後続の増分同期の基礎となります。

    重要

    MySQL データベースから ApsaraDB for SelectDB インスタンスにデータを同期する場合、データ型が変換されます。 スキーマ同期 を選択しない場合は、事前に対応するスキーマを持つ Unique key モデルを使用するテーブルを宛先 ApsaraDB for SelectDB インスタンスに作成する必要があります。 詳細については、「ApsaraDB RDS for MySQL インスタンスから ApsaraDB for SelectDB インスタンスにデータを同期する」および「データモデル」をご参照ください。

    ソースオブジェクト

    [ソースオブジェクト] セクションから 1 つ以上のオブジェクトを選択し、向右 アイコンをクリックして、[選択済みオブジェクト] セクションにオブジェクトを追加します。 同期するオブジェクトとして、列、テーブル、またはデータベースを選択できます。

    重要
    1. 同期するオブジェクトとしてデータベースを選択した場合、DTS は新しいテーブルを同期しません。 データ同期中にソースデータベースにテーブルを作成する必要がある場合は、同期するオブジェクトとしてテーブルを選択してください。 データ同期タスクで同期するオブジェクトを変更するには、[データ同期] タブでデータ同期タスクを見つけて、[アクション] 列の [その他] > [オブジェクトの再選択] を選択します。

    2. 同期するオブジェクトとしてテーブルを選択し、テーブルの名前変更や列の名前変更などのテーブルの編集が必要な場合は、1 つのデータ同期タスクで最大 1,000 個のテーブルを同期できます。 1,000 個を超えるテーブルを同期するタスクを実行すると、リクエストエラーが発生します。 この場合、複数のタスクを設定してテーブルを同期するか、データベース全体を同期するタスクを設定することをお勧めします。

    選択中のオブジェクト

    • 宛先インスタンスに同期するオブジェクトの名前を変更するには、選択中のオブジェクト セクションでオブジェクトを右クリックします。 詳細については、「オブジェクト名マッピング」トピックの「単一オブジェクトの名前をマッピングする」セクションをご参照ください。

    • 複数のオブジェクトの名前を一度に変更するには、選択中のオブジェクト セクションの右上隅にある 一括編集 をクリックします。 詳細については、「オブジェクト名マッピング」トピックの「一度に複数のオブジェクト名をマッピングする」セクションをご参照ください。

    説明
    1. 特定のデータベースまたはテーブルで実行される SQL 操作を選択するには、選択中のオブジェクト セクションでオブジェクトを右クリックします。 表示されるダイアログボックスで、同期する SQL 操作を選択します。 詳細については、「ApsaraDB RDS for MySQL インスタンスから ApsaraDB for SelectDB インスタンスにデータを同期する」をご参照ください。

    2. WHERE 条件を指定してデータをフィルタリングするには、選択中のオブジェクト セクションでテーブルを右クリックします。 表示されるダイアログボックスで、条件を指定します。 詳細については、「フィルタ条件を指定する」をご参照ください。

    3. オブジェクト名マッピング機能を使用してオブジェクトの名前を変更すると、そのオブジェクトに依存する他のオブジェクトが同期に失敗する可能性があります。

  10. オプション。 ページの下部にある 次:データベースおよびテーブルのフィールド設定 をクリックします。 表示されるダイアログボックスで、ApsaraDB for SelectDB インスタンスに同期されるテーブルの プライマリキー列の追加配布キー、および エンジンの選択 パラメーターを設定します。

  11. 説明
    1. 同期タイプ パラメーターで スキーマ同期 を選択した場合にのみ、この手順を実行できます。 パラメーターを変更するには、定義ステータス パラメーターを すべて に設定します。

    2. ドロップダウンリストから プライマリキー列の追加 パラメーターに複数の列を選択できます。 プライマリキー列の追加 パラメーターに指定された 1 つ以上の列を、配布キー パラメーターに選択できます。 エンジンの選択 パラメーターには、[一意] のみを選択できます。

  12. タスク設定を保存し、事前チェックを実行します。 成功率が [100%] になるまで待ちます。 次に、[次へ: インスタンスの購入] をクリックします。

  13. [インスタンスの購入] ページで、データ同期インスタンスの [請求方法] および [インスタンスクラス] パラメーターを設定し、チェックボックスをオンにして [data Transmission Service (従量課金制) サービス規約] を読んで同意し、次に [購入して開始] をクリックしてデータ同期タスクを開始します。 タスクの進行状況はタスクリストで確認できます。

Flink CDC を使用してデータを移行する

Flink は、MySQL データベースから ApsaraDB for SelectDB インスタンスにデータを移行するために使用できる Flink SQL、Flink CDC、および DataStream を提供します。 Flink CDC を使用して、既存データ、増分データ、およびスキーマを移行し、DDL 操作を同期できます。 この例では、Flink CDC を使用して、アップストリーム MySQL データベースから ApsaraDB for SelectDB インスタンスにデータを同期する方法について説明します。 詳細については、「Flink を使用してデータをインポートする」をご参照ください。

手順

環境を準備する

Flink 環境を構築します。 この例では、バージョン 1.16 の Flink スタンドアロンクラスタがデプロイされています。

  1. flink-1.16.3-bin-scala_2.12.tgz パッケージをダウンロードし、パッケージを解凍します。 このバージョンの有効期限が切れている場合は、別のバージョンをダウンロードしてください。 バージョンの詳細については、Apache Flink をご参照ください。 サンプルコード:

    wget https://archive.apache.org/dist/flink/flink-1.16.3/flink-1.16.3-bin-scala_2.12.tgz
    tar -zxvf flink-1.16.3-bin-scala_2.12.tgz
  2. flink-sql-connector-mysql-cdc-2.4.2 および flink-doris-connector-1.16-1.5.2 パッケージを FLINK_HOME/lib ディレクトリにダウンロードします。 サンプルコード:

    cd flink-1.16.3
    cd lib/
    wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.2/flink-sql-connector-mysql-cdc-2.4.2.jar
    wget https://repo.maven.apache.org/maven2/org/apache/doris/flink-doris-connector-1.16/1.5.2/flink-doris-connector-1.16-1.5.2.jar
  3. Flink スタンドアロンクラスタを起動します。 サンプルコード:

    bin/start-cluster.sh
  4. ApsaraDB for SelectDB インスタンスを作成します。 詳細については、「インスタンスを作成する」をご参照ください。

  5. MySQL プロトコルを介して ApsaraDB for SelectDB インスタンスに接続します。 詳細については、「インスタンスに接続する」をご参照ください。

  6. テストデータベースとテストテーブルを作成します。

    1. 次のステートメントを実行して、テストデータベースを作成します。

      CREATE DATABASE test_db;
    2. 次のステートメントを実行して、テストテーブルを作成します。

      USE test_db;
      CREATE TABLE employees (
          emp_no       int NOT NULL,
          birth_date   date,
          first_name   varchar(20),
          last_name    varchar(20),
          gender       char(2),
          hire_date    date
      )
      UNIQUE KEY(`emp_no`)
      DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;

Flink CDC を使用してデータを同期する

次のサンプルコードは、Flink CDC の構文を示しています。

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    mysql-sync-database \
    --database test_db \
    --including-tables "tbl1|test.*" \
    --mysql-conf hostname=127.0.0.1 \
    --mysql-conf username=root \
    --mysql-conf password=123456 \
    --mysql-conf database-name=mysql_db \
    --sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
    --sink-conf username=admin \
    --sink-conf password=****

パラメーター

説明

execution.checkpointing.interval

チェックポイントの間隔。データ同期の頻度に影響します。 このパラメーターを 10 秒に設定することをお勧めします。

parallelism.default

Flink ジョブの並列度。 並列度を適切に増やすことで、データ同期を高速化できます。

database

ApsaraDB for SelectDB インスタンスでデータが同期されるデータベースの名前。

including-tables

データの同期元となる MySQL テーブル。 複数のテーブル名を区切るには、縦棒 (|) を使用します。 正規表現もサポートされています。 例: --including-tables table1|tbl.*。これは、table1 と、名前が tbl で始まるすべてのテーブルからデータが同期されることを指定します。

excluding-tables

除外するテーブル。 このパラメーターは、including-tables パラメーターと同じ方法で指定できます。

mysql-conf

MySQL CDC ソースの設定項目。 設定項目の詳細については、「MySQL CDC コネクタ」をご参照ください。 hostnameusernamepassword、および database-name パラメーターは必須です。

sink-conf

Doris シンクの設定項目。 詳細については、「Flink を使用してデータをインポートする」トピックの「Doris シンクの設定項目」セクションをご参照ください。

table-conf

ApsaraDB for SelectDB テーブルの設定項目。 設定項目は、ApsaraDB for SelectDB テーブルの作成時にプロパティに含まれています。

説明
  1. データを同期するには、$FLINK_HOME/lib ディレクトリに flink-sql-connector-mysql-cdc-${version}.jar や flink-sql-connector-oracle-cdc-${version}.jar などの Flink CDC の依存関係をインストールする必要があります。

  2. データベースから完全データを同期する場合、Flink のバージョンは 1.15 以降である必要があります。 さまざまなバージョンの Flink Doris コネクタのダウンロード方法の詳細については、org/apache/doris をご参照ください。

カタログを使用してデータを移行する

ApsaraDB for SelectDB は、フェデレーテッドクエリを実行することで MySQL データソースにアクセスするためのカタログ機能を提供します。 これにより、MySQL データソースから ApsaraDB for SelectDB インスタンスに既存データを簡単かつ迅速に移行できます。 次の例は、カタログを使用して、アップストリーム MySQL データベースから ApsaraDB for SelectDB インスタンスにデータを同期する方法を示しています。 詳細については、「JDBC データソース」をご参照ください。

手順

  1. SelectDB インスタンスに接続します。 詳細については、「インスタンスに接続する」をご参照ください。

    説明

    Data Management (DMS) を使用して ApsaraDB for SelectDB インスタンスにログインすると、SWITCH コマンドは無効になります。 MySQL クライアントを使用して ApsaraDB for SelectDB インスタンスに接続することをお勧めします。

  2. MySQL データソースの Java Database Connectivity (JDBC) カタログを作成します。

CREATE CATALOG jdbc_mysql PROPERTIES (
    "type"="jdbc",
    "user"="root",
    "password"="123456",
    "jdbc_url" = "jdbc:mysql://127.0.0.1:3306/demo",
    "driver_url" = "mysql-connector-java-8.0.25.jar",
    "driver_class" = "com.mysql.cj.jdbc.Driver",
    "checksum" = "fdf55dcef04b09f2eaf42b75e61ccc9a"
)

パラメーター

パラメーター

必須

デフォルト値

説明

user

はい

デフォルト値なし

MySQL データベースへの接続に使用するアカウント。

password

はい

デフォルト値なし

MySQL データベースへの接続に使用するアカウントのパスワード。

jdbc_url

はい

デフォルト値なし

MySQL データベースへの接続に使用する JDBC URL。

driver_url

はい

デフォルト値なし

JDBC ドライバーの JAR パッケージの名前。

driver_class

はい

デフォルト値なし

JDBC ドライバークラスの名前。

lower_case_table_names

いいえ

"false"

外部 JDBC データソースのデータベースとテーブルの名前を小文字で同期するかどうかを指定します。

only_specified_database

いいえ

"false"

特定のデータベースからのみデータを移行するかどうかを指定します。

include_database_list

いいえ

""

データの移行元となるデータベースの名前。 このパラメーターは、only_specified_database パラメーターを true に設定した場合にのみ有効になります。 複数のデータベース名はコンマ (,) で区切ります。 データベース名は大文字と小文字が区別されます。

exclude_database_list

いいえ

""

データの移行元としないデータベースの名前。 このパラメーターは、only_specified_database パラメーターを true に設定した場合にのみ有効になります。 複数のデータベース名はコンマ (,) で区切ります。 データベース名は大文字と小文字が区別されます。

  1. ApsaraDB for SelectDB インスタンスにテーブルを作成し、ETL (抽出、変換、ロード) 構文の INSERT INTO SELECT ステートメントを実行してデータを同期します。 詳細については、「INSERT INTO ステートメントを使用してデータをインポートする」をご参照ください。

# テーブルを作成します。
CREATE TABLE selectdb_table ...

# データを同期します。
INSERT INTO selectdb_table SELECT * FROM mysql_catalog.mysql_database.mysql_table;

DataWorks を使用してデータを移行する

ApsaraDB for SelectDB では、DataWorks のデータ統合機能を使用して既存データをインポートできます。 このセクションでは、DataWorks を使用して MySQL データベースから ApsaraDB for SelectDB インスタンスにデータを同期する方法について説明します。 詳細については、「DataWorks を使用してデータをインポートする」をご参照ください。

重要

BITMAP、HyperLogLog (HLL)、または QUANTILE_STATE データ型のフィールドに書き込むことはできません。

データソースを追加する

データ同期タスクを設定する前に、MySQL データソースと ApsaraDB for SelectDB データソースを DataWorks に追加する必要があります。

  1. MySQL データソースを追加します。 詳細については、「MySQL データソース」をご参照ください。

  2. ApsaraDB for SelectDB データソースを追加します。 詳細については、「データソースを追加および管理する」をご参照ください。 次の表に、ApsaraDB for SelectDB データソースを追加するために使用されるパラメーターを示します。

    パラメーター

    説明

    [データソース名]

    データソースの名前。

    [ホストアドレス/ IP アドレス]

    jdbc:mysql://<ip>:<port>/<dbname> 形式の JDBC URL。

    ApsaraDB for SelectDB インスタンスの VPC エンドポイントまたはパブリックエンドポイントと MySQL ポートを取得するには、次の操作を実行します。 ApsaraDB for SelectDB コンソールにログインし、情報を表示するインスタンスの [インスタンスの詳細] ページに移動します。 [基本情報] ページの [ネットワーク情報] セクションで、[VPC エンドポイント] または [パブリックエンドポイント] パラメーターと [mysql ポート] パラメーターの値を表示します。

    例: jdbc:mysql://selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030/test_db

    [HTTP 接続アドレス]

    HTTP 経由で ApsaraDB for SelectDB インスタンスにアクセスするために使用される IP アドレスとポート番号。 値は <ip>:<port> 形式です。

    ApsaraDB for SelectDB インスタンスの VPC エンドポイントまたはパブリックエンドポイントと HTTP ポートを取得するには、次の操作を実行します。 ApsaraDB for SelectDB コンソールにログインし、情報を表示するインスタンスの [インスタンスの詳細] ページに移動します。 [基本情報] ページの [ネットワーク情報] セクションで、[VPC エンドポイント] または [パブリックエンドポイント] パラメーターと [HTTP ポート] パラメーターの値を表示します。

    例: selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080

    [ユーザー名]

    ApsaraDB for SelectDB インスタンスのオーナーアカウントのユーザー名。

    [パスワード]

    ApsaraDB for SelectDB インスタンスのオーナーアカウントのパスワード。

データ同期タスクを設定する

コードレス ユーザーインターフェース (UI) またはコードエディターを使用して、データ同期タスクを設定できます。 詳細については、以下のトピックをご参照ください。