すべてのプロダクト
Search
ドキュメントセンター

Data Transmission Service:PolarDB-X 1.0 インスタンスから ApsaraMQ for Kafka インスタンスへのデータ移行

最終更新日:Mar 29, 2026

Data Transmission Service (DTS) を使用して、PolarDB-X 1.0 インスタンスから ApsaraMQ for Kafka インスタンスにデータを移行します。DTS はスキーマ移行、完全データ移行、増分データ移行をサポートしているため、サービスのダウンタイムを最小限に抑えてデータを移行できます。

前提条件

開始する前に、以下を確認してください。

  • PolarDB-X 1.0 インスタンスがあること。詳細については、「PolarDB-X 1.0 インスタンスの作成」をご参照ください。

  • 移行先の ApsaraMQ for Kafka インスタンスに、移行データを受信するためのトピックが作成されていること。詳細については、「概要」をご参照ください。

  • 移行先の ApsaraMQ for Kafka インスタンスで利用可能なストレージ容量が、移行元の PolarDB-X 1.0 インスタンスの総データサイズよりも大きいこと。

制限事項

ソースデータベースの要件

ハード制約 — 満たされない場合、移行は失敗します:

  • 移行対象のテーブルには PRIMARY KEY または UNIQUE 制約が必要であり、すべてのフィールドが一意である必要があります。そうでない場合、移行先に重複レコードが含まれる可能性があります。

  • 読み取り専用の PolarDB-X 1.0 インスタンスからの移行はサポートされていません。

  • 移行オブジェクトとしてテーブルを選択し、テーブル名または列名を変更する必要がある場合、1 つのタスクでサポートされるテーブルは最大 1,000 です。1,000 を超えるテーブルの場合は、移行を複数のタスクに分割するか、データベース全体を移行してください。

増分データ移行に必要な設定:

  • バイナリログが有効になっており、binlog_row_imagefull に設定されている必要があります。これが設定されていない場合、事前チェックが失敗し、タスクを開始できません。

  • バイナリログの保持期間:保持期間が不十分な場合、DTS がバイナリログを読み取れず、タスクの失敗やデータの不整合を引き起こす可能性があります。完全移行が完了した後、保持期間を 24 時間以上に短縮できます。

    • 増分移行のみ:バイナリログを 24 時間以上保持します。

    • 完全 + 増分移行:バイナリログを少なくとも 7 日間保持します。

移行中の操作制限:

  • ソースインスタンスのアップグレードやダウングレード、頻繁に更新されるテーブルの変更、シャードキーの変更、ソースオブジェクトに対する DDL 操作を実行しないでください。これらの操作はタスクの失敗を引き起こします。

  • 完全および増分移行中、DTS はセッションレベルで外部キー制約チェックとカスケード操作を一時的に無効にします。この期間中のカスケード更新または削除は、データの不整合を引き起こす可能性があります。

  • 移行中に PolarDB-X 1.0 インスタンスのネットワークタイプが変更された場合は、DTS 移行タスクのネットワーク接続設定を適宜更新してください。

  • 完全移行のみ(増分移行なし)の場合:移行中にソースにデータを書き込まないでください。これにより不整合が発生します。データの一貫性を保つには、[スキーマ移行][完全データ移行]、および[増分データ移行]を同時に選択してください。

その他の制限事項

  • 移行前に、ソースおよびターゲットデータベースのパフォーマンスへの影響を評価してください。移行タスクはオフピーク時に実行してください。完全データ移行は両方のデータベースの読み取りおよび書き込みリソースを使用するため、サーバーの負荷が増加する可能性があります。

  • 完全データ移行では同時 INSERT 操作が使用されるため、移行先でテーブルの断片化が発生します。完全移行後、移行先のテーブルスペースはソースよりも大きくなります。

  • DTS は失敗したタスクを最大 7 日間再開しようとします。ワークロードを移行先に切り替える前に、失敗したタスクを停止またはリリースしてください。または、REVOKE 文を実行して、移行先にアクセスする DTS アカウントから書き込み権限を取り消してください。ワークロードを切り替えた後に失敗したタスクが再開されると、ソースデータが移行先データを上書きする可能性があります。

注意事項

  • DTS は、バイナリログの位置を進めるために、ソースデータベースの ` dts_health_check.ha_health_check ` テーブルを定期的に更新します。

  • 移行中に移行先の ApsaraMQ for Kafka インスタンスがアップグレードまたはダウングレードされた場合は、インスタンスを再起動して移行を再開してください。

課金

移行タイプインスタンス構成料金インターネットトラフィック料金
スキーマ移行および完全なデータ移行無料無料
増分データ移行有料です。詳細については、「課金概要」をご参照ください。

移行タイプ

移行タイプ説明
スキーマ移行DTS はオブジェクトスキーマをソースから移行先に移行します。
完全データ移行DTS は既存のすべてのデータをソースから移行先に移行します。
増分データ移行完全データ移行が完了した後、DTS はソースからの増分変更を継続的に移行します。これにより、移行先が同期され、アプリケーションを中断することなく切り替えることができます。

増分移行の SQL 操作

操作タイプSQL ステートメント
DMLINSERT、UPDATE、DELETE

必要な権限

データベーススキーマ移行完全データ移行増分データ移行
PolarDB-X 1.0SELECTSELECT移行対象オブジェクトに対する REPLICATION SLAVE、REPLICATION CLIENT、および SELECT。詳細については、「データ同期アカウントに必要な権限」セクションをご参照ください。
ApsaraMQ for Kafka読み取りと書き込み読み取りと書き込み読み取りと書き込み

データ型マッピング

詳細については、「スキーマ同期のデータ型マッピング」をご参照ください。

移行タスクの作成

ステップ 1:データ移行タスクページへの移動

  1. Data Management (DMS) コンソールにログインします。

  2. 上部のナビゲーションバーで、DTS をクリックします。

  3. 左側のナビゲーションウィンドウで、DTS (DTS) > データ移行 を選択します。

説明

コンソール操作は、DMS モードおよびレイアウトによって異なる場合があります。詳細については、「シンプルモード」および「DMS コンソールのレイアウトとスタイルのカスタマイズ」をご参照ください。また、「データ移行タスクページ」に直接アクセスすることもできます。

ステップ 2:リージョンの選択

[データ移行タスク] の横にあるドロップダウンリストから、移行インスタンスが存在するリージョンを選択します。

説明

新しい DTS コンソールでは、左上隅でリージョンを選択します。

ステップ 3:ソースデータベースとターゲットデータベースの設定

[タスクの作成] をクリックし、以下のパラメーターを設定します。

警告

ソースインスタンスとターゲットインスタンスを選択した後、先に進む前にページ上部の [制限事項] セクションをお読みください。

ソースデータベース (PolarDB-X 1.0)

パラメーター説明
タスク名タスクの名前。DTS はデフォルトの名前を割り当てます。タスクを識別しやすくするために、わかりやすい名前を指定してください。名前は一意である必要はありません。
既存の DMS データベースインスタンスを選択使用する既存のデータベースインスタンス。選択した場合、DTS は他のパラメーターを自動的に入力します。選択しない場合は、パラメーターを手動で設定します。
データベースタイプ[PolarDB-X 1.0] を選択します。
アクセス方法[Alibaba Cloud インスタンス] を選択します。
インスタンスリージョンソースの PolarDB-X 1.0 インスタンスが存在するリージョン。
Alibaba Cloud アカウント間でのデータ複製Alibaba Cloud アカウント間でデータを移行するかどうか。同一アカウントでの移行の場合は [いいえ] を選択します。
インスタンス IDソースの PolarDB-X 1.0 インスタンスの ID。
データベースアカウントソースインスタンスのデータベースアカウント。移行先の ApsaraMQ for Kafka インスタンスで使用されるデータ形式に基づいて権限を付与します。
データベースパスワードデータベースアカウントのパスワード。

ターゲットデータベース (ApsaraMQ for Kafka)

パラメーター説明
既存の DMS データベースインスタンスを選択使用する既存のデータベースインスタンス。選択した場合、DTS は他のパラメーターを自動的に入力します。選択しない場合は、パラメーターを手動で設定します。
データベースタイプ[Kafka] を選択します。
アクセス方法[Express Connect、VPN Gateway、または Smart Access Gateway] を選択します。
インスタンスリージョン移行先の ApsaraMQ for Kafka インスタンスが存在するリージョン。
接続済み VPC移行先の ApsaraMQ for Kafka インスタンスの Virtual Private Cloud (VPC) ID。VPC ID を確認するには、ApsaraMQ for Kafka コンソールのインスタンス詳細ページに移動し、[インスタンス情報] タブの [設定情報] セクションを確認します。
IP アドレスまたはドメイン名移行先の ApsaraMQ for Kafka インスタンスの IP アドレス。IP アドレスを確認するには、インスタンス詳細ページに移動し、[インスタンス情報] タブの [エンドポイント情報] セクションを確認します。[デフォルトエンドポイント] フィールドから IP アドレスをコピーします。
ポート番号移行先インスタンスのサービスポート。デフォルト値:9092
データベースアカウント移行先の ApsaraMQ for Kafka インスタンスのデータベースアカウント。アクセス制御リスト (ACL) 機能が有効な場合にのみ必要です。詳細については、「SASL ユーザーへの権限付与」をご参照ください。
データベースパスワードデータベースアカウントのパスワード。ACL 機能が有効な場合にのみ必要です。
Kafka バージョン移行先の ApsaraMQ for Kafka インスタンスのバージョン。
暗号化接続を暗号化するかどうか。セキュリティ要件に基づいて [非暗号化] または [SCRAM-SHA-256] を選択します。
トピック移行データを受信するトピック。ドロップダウンリストから選択します。
DDL 情報を格納するトピックDDL 情報を格納するトピック。指定しない場合、DDL 情報は [トピック] パラメーターで設定されたトピックに格納されます。
Kafka Schema Registry を使用Avro スキーマを格納および取得するための RESTful インターフェイスを提供する Kafka Schema Registry を使用するかどうか。スキップする場合は [いいえ] を選択し、使用する場合は [はい] を選択して Kafka Schema Registry に登録されている URL または IP アドレスを入力します。

ステップ 4:接続テスト

ページ下部の [接続テストと次へ] をクリックします。

Alibaba Cloud データベースインスタンス (ApsaraDB RDS for MySQL や ApsaraDB for MongoDB など) の場合、DTS は自動的にそのサーバー CIDR ブロックをインスタンスの IP アドレスホワイトリストに追加します。 Elastic Compute Service (ECS) インスタンス上の自己管理データベースの場合、DTS は自動的にその CIDR ブロックを ECS セキュリティグループルールに追加します。 自己管理データベースが複数の ECS インスタンスにまたがる場合は、各インスタンスのセキュリティグループルールに CIDR ブロックを手動で追加してください。 データセンター内、またはサードパーティプロバイダーによってホストされている自己管理データベースの場合、DTS サーバーの CIDR ブロックをデータベースの IP アドレスホワイトリストに手動で追加してください。 詳細については、「DTS サーバーの CIDR ブロックをオンプレミスデータベースのセキュリティ設定に追加する」をご参照ください。

警告

DTS サーバーの CIDR ブロックを IP ホワイトリストやセキュリティグループルールに追加すると、セキュリティ上のリスクが生じます。DTS を使用する前に、アカウント認証情報の保護、公開ポートの制限、API 呼び出しの認証、ホワイトリストとセキュリティグループルールの定期的な監査、データベース接続に Express Connect、VPN Gateway、または Smart Access Gateway を使用するなど、予防措置を講じてください。

ステップ 5:オブジェクトの選択と移行設定

以下のパラメーターを設定します:

パラメーター説明
移行タイプ要件に基づいて移行タイプを選択します:- 完全移行のみ:[スキーマ移行][完全データ移行] を選択します。- ライブ同期を伴う完全移行:[スキーマ移行][完全データ移行][増分データ移行] を選択します。
説明

[増分データ移行] を選択しない場合、データの不整合を避けるために移行中にソースデータベースに書き込まないでください。

競合するテーブルの処理モード- 事前チェックとエラー報告:移行先でソーステーブルと同じ名前のテーブルをチェックします。一致が存在する場合、事前チェックは失敗します。名前の競合を処理するには、オブジェクト名マッピングを使用して移行先テーブルの名前を変更します。- エラーを無視して続行:重複するテーブル名の事前チェックをスキップします。
警告

スキーマが一致する場合、DTS は重複するプライマリキーを持つレコードをスキップします。スキーマが異なる場合、特定の列のみが移行されるか、タスクが失敗する可能性があります。

Kafka のデータ形式ApsaraMQ for Kafka に書き込まれるデータの形式。PolarDB-X 1.0 は [DTS Avro] のみをサポートし、Canal JSON はサポートされていません。DTS Avro データは DTS Avro スキーマ定義に従って解析されます。スキーマリファレンスについては、GitHub をご参照ください。
Kafka パーティションへのデータ転送ポリシーこのソースと移行先の組み合わせでは、カスタムパーティションルーティングはサポートされていません。
宛先インスタンスのオブジェクト名の大文字化宛先におけるデータベース名、テーブル名、および列名の大文字小文字のポリシーです。デフォルトは DTS デフォルトポリシー です。詳細については、「宛先インスタンスにおけるオブジェクト名の大文字小文字の指定」をご参照ください。
ソースオブジェクト[ソースオブジェクト] セクションからオブジェクトを選択し、矢印アイコンをクリックして [選択済みオブジェクト] に移動します。
説明

- テーブルを選択すると、テーブルのみが移行され、ビュー、トリガー、ストアドプロシージャは移行されません。- データベースを選択すると、以下のルールに基づいてデータが移行されます: - プライマリキーを持つテーブル:プライマリキー列が分散キーになります。 - プライマリキーを持たないテーブル:DTS は自動採番主キー列を自動生成しますが、これによりデータの不整合が発生する可能性があります。

選択済みオブジェクトオブジェクトを右クリックして名前を変更したり、WHERE フィルター条件を設定したりします。一度に複数のオブジェクトの名前を変更するには、[一括編集] をクリックします。
説明

オブジェクトの名前を変更すると、依存オブジェクトが移行に失敗する場合があります。フィルター条件については、「フィルター条件を設定する」をご参照ください。

ステップ 6:詳細設定

[次へ:詳細設定] をクリックし、以下を設定します:

パラメーター説明
タスクのスケジュールに使用する専用クラスターを選択オプション。デフォルトの共有クラスターを使用する場合は空白のままにします。詳細については、「DTS 専用クラスターとは
アラートの設定タスクの失敗または移行遅延が大きい場合にアラートを設定するかどうか。[いいえ] を選択するとスキップされます。または、[はい] を選択して、アラートのしきい値を設定し、アラート連絡先を指定します。詳細については、「モニタリングとアラートの設定」をご参照ください。
接続失敗時の再試行時間タスク開始後に接続が失敗した場合に DTS が再試行する時間。有効な値:10~1440 分。デフォルト:720 分。30 分以上に設定してください。この期間内に DTS が再接続するとタスクは再開されますが、それ以外の場合はタスクは失敗します。
説明

複数のタスクが同じソースまたはターゲットデータベースを共有し、異なる再試行時間を持つ場合、最後に設定された値が有効になります。DTS は再試行期間中もインスタンスに課金します。要件に基づいて再試行時間を設定し、不要になったインスタンスは速やかにリリースしてください。

ソースおよびターゲットデータベースで他の問題が発生した場合の再試行待機時間DTS が失敗した DDL 操作または DML 操作を再試行する時間。有効な値:1~1440 分。デフォルト:10 分。10 分より大きい値に設定してください。この値は、失敗した接続の再試行時間 の値より小さい必要があります。
ETL の設定ETL(抽出・変換・書き出し)を有効にするかどうかを指定します。[はい] を選択すると、データ処理文を入力できます。[いいえ] を選択すると、このステップをスキップできます。詳細については、「ETLとは何か」および「データ移行またはデータ同期タスクでETLを設定する」をご参照ください。

ステップ 7:事前チェックの実行

[次へ:タスク設定の保存と事前チェック] をクリックします。

説明

このタスク設定の API パラメーターを確認するには、[次へ:タスク設定の保存と事前チェック] にカーソルを合わせ、[OpenAPI パラメーターのプレビュー] をクリックします。

DTS はタスク開始前に事前チェックを実行します。タスクは事前チェックに合格した後にのみ続行できます。

  • 事前チェックが失敗した場合、失敗した各項目の横にある [詳細の表示] をクリックし、問題を修正して再度事前チェックを実行します。

  • 事前チェック中にアラートが表示された場合:

    • 無視できないアラートの場合:[詳細の表示] をクリックし、問題を修正して再度事前チェックを実行します。

    • 無視できるアラートの場合:[アラート詳細の確認] をクリックし、[無視] > [OK] > [再度事前チェック] をクリックします。アラートを無視すると、データの不整合が発生する可能性があります。

ステップ 8:移行インスタンスの購入

成功率100% に達するまで待ってから、[次へ:インスタンスの購入] をクリックします。

[インスタンスの購入] ページで、以下を設定します:

パラメーター説明
リソースグループ移行インスタンスのリソースグループ。デフォルト:デフォルトリソースグループResource Management とは
インスタンスクラスインスタンスクラスによって移行速度が決まります。データ量と時間要件に基づいて選択してください。詳細については、「データ移行インスタンスの仕様」をご参照ください。

ステップ 9:移行の開始

[Data Transmission Service (従量課金) 利用規約] を読み、同意してから、[購入して開始] をクリックします。

移行タスクが開始されます。タスクリストで進捗を監視します。

次のステップ

移行が完了した後、アプリケーションを移行先の ApsaraMQ for Kafka インスタンスに切り替える前に、ソースと移行先のデータ整合性を確認してください。タスクが再開された後にソースデータが移行先データを上書きするのを防ぐために、失敗した DTS タスクを停止またはリリースしてください。