Data Transmission Service (DTS)、DataWorks、Flink Change Data Capture (CDC)、またはカタログを使用して、MySQL データソースから ApsaraDB for SelectDB インスタンスにデータを移行できます。 MySQL データソースは、自己管理 MySQL データベース、ApsaraDB RDS for MySQL インスタンス、または PolarDB for MySQL クラスタです。 移行するデータ量とビジネスシナリオに基づいて、適切なソリューションを選択してデータを移行できます。 このトピックでは、MySQL データソースから ApsaraDB for SelectDB インスタンスにデータを移行するために使用されるソリューションについて説明し、比較します。
移行ソリューションの比較
DTS、DataWorks、Flink CDC、またはカタログを使用して、MySQL データソースから ApsaraDB for SelectDB インスタンスにデータを移行できます。 移行できるデータは、移行ソリューションによって異なります。 ビジネス要件に基づいて適切なソリューションを選択できます。
ソリューション | 既存データの移行 | 増分データ同期 | スキーマ移行 | データベース移行 | DDL 操作の増分同期 | データ検証 |
DTS | ✔️ | ✔️ | ✔️ | ✔️ | ✔️ | ✔️ |
DataWorks | ✔️ | ✔️ | ✔️ | ✔️ | ✔️ | ❌ |
Flink CDC | ✔️ | ✔️ | ✔️ | ✔️ | ✔️ | ❌ |
カタログ | ✔️ | ❌ | ❌ | ❌ | ❌ | ❌ |
以下のセクションでは、これらのソリューションを使用してデータを移行する方法について説明します。 詳細については、関連するリファレンスをご参照ください。
前提条件
MySQL データベースは SelectDB インスタンスに接続されています。
MySQL データベースは、SelectDB インスタンスと同じ virtual private cloud (VPC) に存在します。 MySQL データベースが別の VPC に存在する場合は、MySQL データベースを SelectDB インスタンスに接続する必要があります。 詳細については、「ApsaraDB for SelectDB インスタンスとデータソース間の接続の確立に失敗した場合はどうすればよいですか?」をご参照ください。
MySQL データベースの IP アドレスは、SelectDB インスタンスの IP アドレスホワイトリストに追加されています。 詳細については、「IP アドレスホワイトリストを設定する」をご参照ください。
SelectDB インスタンスが存在する VPC 内の IP アドレスは、MySQL データベースでホワイトリストメカニズムがサポートされている場合、MySQL データベースの IP アドレスホワイトリストに追加されます。
SelectDB インスタンスが属する VPC 内の SelectDB インスタンスの IP アドレスを取得するには、「ApsaraDB SelectDB インスタンスが属する VPC 内の IP アドレスを表示するにはどうすればよいですか?」に記載されている操作を実行できます。
SelectDB インスタンスのパブリック IP アドレスを取得するには、ping コマンドを実行して SelectDB インスタンスのパブリックエンドポイントにアクセスし、インスタンスの IP アドレスを取得できます。
DTS を使用してデータを移行する
DTS を使用すると、既存データを移行し、MySQL データソースから ApsaraDB for SelectDB インスタンスに増分データを同期できます。 DTS は、スキーマ移行、DDL 操作同期、およびデータ検証もサポートしています。 この例では、ApsaraDB RDS for MySQL インスタンスを使用して、MySQL データソースから ApsaraDB for SelectDB インスタンスにデータを移行する方法について説明します。 詳細については、「DTS を使用してデータをインポートする」をご参照ください。
手順
ApsaraDB for SelectDB コンソール にログインします。
上部のナビゲーションバーで、管理するインスタンスが存在するリージョンを選択します。
左側のナビゲーションウィンドウで、[インスタンス] をクリックします。 [インスタンス] ページで、インスタンスを見つけて ID をクリックし、[インスタンスの詳細] ページに移動します。
インスタンスの詳細ページの左側のナビゲーションウィンドウで、[データパイプライン] をクリックします。表示されるページで、[データ同期] タブをクリックします。
説明ほとんどの場合、DTS でのデータ同期には、既存データの移行と増分データのリアルタイム同期が含まれます。 DTS でのデータ移行は、一般的に既存の履歴データを移行するために使用されます。
[データ同期タスクの作成] をクリックします。[データ同期タスクの作成] ページで、ソース データベースとターゲット データベースに関する情報を構成します。
[接続テストと続行] をクリックします。
移行するオブジェクトと詳細設定を構成します。
同期するオブジェクトとしてデータベースを選択した場合、DTS は新しいテーブルを同期しません。 データ同期中にソースデータベースにテーブルを作成する必要がある場合は、同期するオブジェクトとしてテーブルを選択してください。 データ同期タスクで同期するオブジェクトを変更するには、[データ同期] タブでデータ同期タスクを見つけて、[アクション] 列の [その他] > [オブジェクトの再選択] を選択します。
同期するオブジェクトとしてテーブルを選択し、テーブルの名前変更や列の名前変更などのテーブルの編集が必要な場合は、1 つのデータ同期タスクで最大 1,000 個のテーブルを同期できます。 1,000 個を超えるテーブルを同期するタスクを実行すると、リクエストエラーが発生します。 この場合、複数のタスクを設定してテーブルを同期するか、データベース全体を同期するタスクを設定することをお勧めします。
宛先インスタンスに同期するオブジェクトの名前を変更するには、選択中のオブジェクト セクションでオブジェクトを右クリックします。 詳細については、「オブジェクト名マッピング」トピックの「単一オブジェクトの名前をマッピングする」セクションをご参照ください。
複数のオブジェクトの名前を一度に変更するには、選択中のオブジェクト セクションの右上隅にある 一括編集 をクリックします。 詳細については、「オブジェクト名マッピング」トピックの「一度に複数のオブジェクト名をマッピングする」セクションをご参照ください。
特定のデータベースまたはテーブルで実行される SQL 操作を選択するには、選択中のオブジェクト セクションでオブジェクトを右クリックします。 表示されるダイアログボックスで、同期する SQL 操作を選択します。 詳細については、「ApsaraDB RDS for MySQL インスタンスから ApsaraDB for SelectDB インスタンスにデータを同期する」をご参照ください。
WHERE 条件を指定してデータをフィルタリングするには、選択中のオブジェクト セクションでテーブルを右クリックします。 表示されるダイアログボックスで、条件を指定します。 詳細については、「フィルタ条件を指定する」をご参照ください。
オブジェクト名マッピング機能を使用してオブジェクトの名前を変更すると、そのオブジェクトに依存する他のオブジェクトが同期に失敗する可能性があります。
オプション。 ページの下部にある 次:データベースおよびテーブルのフィールド設定 をクリックします。 表示されるダイアログボックスで、ApsaraDB for SelectDB インスタンスに同期されるテーブルの プライマリキー列の追加、配布キー、および エンジンの選択 パラメーターを設定します。
同期タイプ パラメーターで スキーマ同期 を選択した場合にのみ、この手順を実行できます。 パラメーターを変更するには、定義ステータス パラメーターを すべて に設定します。
ドロップダウンリストから プライマリキー列の追加 パラメーターに複数の列を選択できます。 プライマリキー列の追加 パラメーターに指定された 1 つ以上の列を、配布キー パラメーターに選択できます。 エンジンの選択 パラメーターには、[一意] のみを選択できます。
タスク設定を保存し、事前チェックを実行します。 成功率が [100%] になるまで待ちます。 次に、[次へ: インスタンスの購入] をクリックします。
[インスタンスの購入] ページで、データ同期インスタンスの [請求方法] および [インスタンスクラス] パラメーターを設定し、チェックボックスをオンにして [data Transmission Service (従量課金制) サービス規約] を読んで同意し、次に [購入して開始] をクリックしてデータ同期タスクを開始します。 タスクの進行状況はタスクリストで確認できます。
セクション | パラメーター | 説明 |
タスク情報 | タスク名 | DTS タスクの名前。 DTS は自動的にタスク名を生成します。 タスクを簡単に識別できる説明的な名前を指定することをお勧めします。 一意のタスク名を指定する必要はありません。 |
ソースデータベース | データベースタイプ | ソースデータベースの種類。 MySQL を選択します。 |
アクセス方法 | ソースデータベースのアクセス方法。 Alibaba Cloud インスタンス を選択します。 | |
インスタンスのリージョン | ソース ApsaraDB RDS for MySQL インスタンスが存在するリージョン。 | |
RDS インスタンス ID | ソース ApsaraDB RDS for MySQL インスタンスの ID。 例: rm-2z3m****。 | |
データベースアカウント | ソース ApsaraDB RDS for MySQL インスタンスのデータベースアカウント。 アカウントに必要な権限の詳細については、「ApsaraDB RDS for MySQL インスタンスから ApsaraDB for SelectDB インスタンスにデータを同期する」トピックの「データベースアカウントに必要な権限」セクションをご参照ください。 | |
データベースのパスワード | データベースへのアクセスに使用されるパスワード。 | |
暗号化 | データベースへの接続を暗号化するかどうかを指定します。 ビジネス要件に基づいて、[非暗号化] または [SSL 暗号化] を選択できます。 このパラメーターを [SSL 暗号化] に設定する場合は、DTS タスクを設定する前に、ApsaraDB RDS for MySQL インスタンスの SSL 暗号化を有効にする必要があります。 詳細については、「クラウド証明書を使用して SSL 暗号化を有効にする」をご参照ください。 | |
宛先データベース | データベースタイプ | 宛先データベースの種類。 [selectdb] を選択します。 |
アクセス方法 | 宛先データベースのアクセス方法。 Alibaba Cloud インスタンス を選択します。 | |
インスタンスのリージョン | 宛先 ApsaraDB for SelectDB インスタンスが存在するリージョン。 | |
インスタンス ID | 宛先 ApsaraDB for SelectDB インスタンスの ID。 | |
データベースアカウント | 宛先 ApsaraDB for SelectDB インスタンスのデータベースアカウント。 アカウントに必要な権限の詳細については、「ApsaraDB RDS for MySQL インスタンスから ApsaraDB for SelectDB インスタンスにデータを同期する」トピックの「データベースアカウントに必要な権限」セクションをご参照ください。 | |
データベースのパスワード | データベースへのアクセスに使用されるパスワード。 |
パラメーター | 説明 |
同期タイプ | 同期タイプ。 デフォルトでは、[増分データ同期] が選択されています。 [スキーマ同期] と [完全データ同期] も選択する必要があります。 事前チェックが完了すると、DTS は選択したオブジェクトの既存データをソースデータベースから宛先クラスタに同期します。 既存データは、後続の増分同期の基礎となります。 重要 MySQL データベースから ApsaraDB for SelectDB インスタンスにデータを同期する場合、データ型が変換されます。 スキーマ同期 を選択しない場合は、事前に対応するスキーマを持つ Unique key モデルを使用するテーブルを宛先 ApsaraDB for SelectDB インスタンスに作成する必要があります。 詳細については、「ApsaraDB RDS for MySQL インスタンスから ApsaraDB for SelectDB インスタンスにデータを同期する」および「データモデル」をご参照ください。 |
ソースオブジェクト | [ソースオブジェクト] セクションから 1 つ以上のオブジェクトを選択し、 重要 |
選択中のオブジェクト | 説明 |
Flink CDC を使用してデータを移行する
Flink は、MySQL データベースから ApsaraDB for SelectDB インスタンスにデータを移行するために使用できる Flink SQL、Flink CDC、および DataStream を提供します。 Flink CDC を使用して、既存データ、増分データ、およびスキーマを移行し、DDL 操作を同期できます。 この例では、Flink CDC を使用して、アップストリーム MySQL データベースから ApsaraDB for SelectDB インスタンスにデータを同期する方法について説明します。 詳細については、「Flink を使用してデータをインポートする」をご参照ください。
手順
環境を準備する
Flink 環境を構築します。 この例では、バージョン 1.16 の Flink スタンドアロンクラスタがデプロイされています。
flink-1.16.3-bin-scala_2.12.tgz パッケージをダウンロードし、パッケージを解凍します。 このバージョンの有効期限が切れている場合は、別のバージョンをダウンロードしてください。 バージョンの詳細については、Apache Flink をご参照ください。 サンプルコード:
wget https://archive.apache.org/dist/flink/flink-1.16.3/flink-1.16.3-bin-scala_2.12.tgz tar -zxvf flink-1.16.3-bin-scala_2.12.tgzflink-sql-connector-mysql-cdc-2.4.2 および flink-doris-connector-1.16-1.5.2 パッケージを FLINK_HOME/lib ディレクトリにダウンロードします。 サンプルコード:
cd flink-1.16.3 cd lib/ wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.2/flink-sql-connector-mysql-cdc-2.4.2.jar wget https://repo.maven.apache.org/maven2/org/apache/doris/flink-doris-connector-1.16/1.5.2/flink-doris-connector-1.16-1.5.2.jarFlink スタンドアロンクラスタを起動します。 サンプルコード:
bin/start-cluster.shApsaraDB for SelectDB インスタンスを作成します。 詳細については、「インスタンスを作成する」をご参照ください。
MySQL プロトコルを介して ApsaraDB for SelectDB インスタンスに接続します。 詳細については、「インスタンスに接続する」をご参照ください。
テストデータベースとテストテーブルを作成します。
次のステートメントを実行して、テストデータベースを作成します。
CREATE DATABASE test_db;次のステートメントを実行して、テストテーブルを作成します。
USE test_db; CREATE TABLE employees ( emp_no int NOT NULL, birth_date date, first_name varchar(20), last_name varchar(20), gender char(2), hire_date date ) UNIQUE KEY(`emp_no`) DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;
Flink CDC を使用してデータを同期する
次のサンプルコードは、Flink CDC の構文を示しています。
<FLINK_HOME>/bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.16-1.5.2.jar \
mysql-sync-database \
--database test_db \
--including-tables "tbl1|test.*" \
--mysql-conf hostname=127.0.0.1 \
--mysql-conf username=root \
--mysql-conf password=123456 \
--mysql-conf database-name=mysql_db \
--sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
--sink-conf username=admin \
--sink-conf password=****パラメーター | 説明 |
execution.checkpointing.interval | チェックポイントの間隔。データ同期の頻度に影響します。 このパラメーターを 10 秒に設定することをお勧めします。 |
parallelism.default | Flink ジョブの並列度。 並列度を適切に増やすことで、データ同期を高速化できます。 |
database | ApsaraDB for SelectDB インスタンスでデータが同期されるデータベースの名前。 |
including-tables | データの同期元となる MySQL テーブル。 複数のテーブル名を区切るには、縦棒 (|) を使用します。 正規表現もサポートされています。 例: |
excluding-tables | 除外するテーブル。 このパラメーターは、including-tables パラメーターと同じ方法で指定できます。 |
mysql-conf | MySQL CDC ソースの設定項目。 設定項目の詳細については、「MySQL CDC コネクタ」をご参照ください。 |
sink-conf | Doris シンクの設定項目。 詳細については、「Flink を使用してデータをインポートする」トピックの「Doris シンクの設定項目」セクションをご参照ください。 |
table-conf | ApsaraDB for SelectDB テーブルの設定項目。 設定項目は、ApsaraDB for SelectDB テーブルの作成時にプロパティに含まれています。 |
データを同期するには、$FLINK_HOME/lib ディレクトリに flink-sql-connector-mysql-cdc-${version}.jar や flink-sql-connector-oracle-cdc-${version}.jar などの Flink CDC の依存関係をインストールする必要があります。
データベースから完全データを同期する場合、Flink のバージョンは 1.15 以降である必要があります。 さまざまなバージョンの Flink Doris コネクタのダウンロード方法の詳細については、org/apache/doris をご参照ください。
カタログを使用してデータを移行する
ApsaraDB for SelectDB は、フェデレーテッドクエリを実行することで MySQL データソースにアクセスするためのカタログ機能を提供します。 これにより、MySQL データソースから ApsaraDB for SelectDB インスタンスに既存データを簡単かつ迅速に移行できます。 次の例は、カタログを使用して、アップストリーム MySQL データベースから ApsaraDB for SelectDB インスタンスにデータを同期する方法を示しています。 詳細については、「JDBC データソース」をご参照ください。
手順
SelectDB インスタンスに接続します。 詳細については、「インスタンスに接続する」をご参照ください。
説明Data Management (DMS) を使用して ApsaraDB for SelectDB インスタンスにログインすると、
SWITCHコマンドは無効になります。 MySQL クライアントを使用して ApsaraDB for SelectDB インスタンスに接続することをお勧めします。MySQL データソースの Java Database Connectivity (JDBC) カタログを作成します。
CREATE CATALOG jdbc_mysql PROPERTIES (
"type"="jdbc",
"user"="root",
"password"="123456",
"jdbc_url" = "jdbc:mysql://127.0.0.1:3306/demo",
"driver_url" = "mysql-connector-java-8.0.25.jar",
"driver_class" = "com.mysql.cj.jdbc.Driver",
"checksum" = "fdf55dcef04b09f2eaf42b75e61ccc9a"
)パラメーター
パラメーター | 必須 | デフォルト値 | 説明 |
user | はい | デフォルト値なし | MySQL データベースへの接続に使用するアカウント。 |
password | はい | デフォルト値なし | MySQL データベースへの接続に使用するアカウントのパスワード。 |
jdbc_url | はい | デフォルト値なし | MySQL データベースへの接続に使用する JDBC URL。 |
driver_url | はい | デフォルト値なし | JDBC ドライバーの JAR パッケージの名前。 |
driver_class | はい | デフォルト値なし | JDBC ドライバークラスの名前。 |
lower_case_table_names | いいえ | "false" | 外部 JDBC データソースのデータベースとテーブルの名前を小文字で同期するかどうかを指定します。 |
only_specified_database | いいえ | "false" | 特定のデータベースからのみデータを移行するかどうかを指定します。 |
include_database_list | いいえ | "" | データの移行元となるデータベースの名前。 このパラメーターは、 |
exclude_database_list | いいえ | "" | データの移行元としないデータベースの名前。 このパラメーターは、 |
ApsaraDB for SelectDB インスタンスにテーブルを作成し、ETL (抽出、変換、ロード) 構文の
INSERT INTO SELECTステートメントを実行してデータを同期します。 詳細については、「INSERT INTO ステートメントを使用してデータをインポートする」をご参照ください。
# テーブルを作成します。
CREATE TABLE selectdb_table ...
# データを同期します。
INSERT INTO selectdb_table SELECT * FROM mysql_catalog.mysql_database.mysql_table;DataWorks を使用してデータを移行する
ApsaraDB for SelectDB では、DataWorks のデータ統合機能を使用して既存データをインポートできます。 このセクションでは、DataWorks を使用して MySQL データベースから ApsaraDB for SelectDB インスタンスにデータを同期する方法について説明します。 詳細については、「DataWorks を使用してデータをインポートする」をご参照ください。
BITMAP、HyperLogLog (HLL)、または QUANTILE_STATE データ型のフィールドに書き込むことはできません。
データソースを追加する
データ同期タスクを設定する前に、MySQL データソースと ApsaraDB for SelectDB データソースを DataWorks に追加する必要があります。
MySQL データソースを追加します。 詳細については、「MySQL データソース」をご参照ください。
ApsaraDB for SelectDB データソースを追加します。 詳細については、「データソースを追加および管理する」をご参照ください。 次の表に、ApsaraDB for SelectDB データソースを追加するために使用されるパラメーターを示します。
パラメーター
説明
[データソース名]
データソースの名前。
[ホストアドレス/ IP アドレス]
jdbc:mysql://<ip>:<port>/<dbname>形式の JDBC URL。ApsaraDB for SelectDB インスタンスの VPC エンドポイントまたはパブリックエンドポイントと MySQL ポートを取得するには、次の操作を実行します。 ApsaraDB for SelectDB コンソールにログインし、情報を表示するインスタンスの [インスタンスの詳細] ページに移動します。 [基本情報] ページの [ネットワーク情報] セクションで、[VPC エンドポイント] または [パブリックエンドポイント] パラメーターと [mysql ポート] パラメーターの値を表示します。
例:
jdbc:mysql://selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030/test_db[HTTP 接続アドレス]
HTTP 経由で ApsaraDB for SelectDB インスタンスにアクセスするために使用される IP アドレスとポート番号。 値は
<ip>:<port>形式です。ApsaraDB for SelectDB インスタンスの VPC エンドポイントまたはパブリックエンドポイントと HTTP ポートを取得するには、次の操作を実行します。 ApsaraDB for SelectDB コンソールにログインし、情報を表示するインスタンスの [インスタンスの詳細] ページに移動します。 [基本情報] ページの [ネットワーク情報] セクションで、[VPC エンドポイント] または [パブリックエンドポイント] パラメーターと [HTTP ポート] パラメーターの値を表示します。
例:
selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080。[ユーザー名]
ApsaraDB for SelectDB インスタンスのオーナーアカウントのユーザー名。
[パスワード]
ApsaraDB for SelectDB インスタンスのオーナーアカウントのパスワード。
データ同期タスクを設定する
コードレス ユーザーインターフェース (UI) またはコードエディターを使用して、データ同期タスクを設定できます。 詳細については、以下のトピックをご参照ください。
アイコンをクリックして、[選択済みオブジェクト] セクションにオブジェクトを追加します。 同期するオブジェクトとして、列、テーブル、またはデータベースを選択できます。