すべてのプロダクト
Search
ドキュメントセンター

Data Transmission Service:PolarDB-X 2.0 から Message Queue for Apache Kafka へのデータ移行

最終更新日:Mar 29, 2026

Data Transmission Service (DTS) を使用すると、PolarDB-X 2.0 インスタンスから Message Queue for Apache Kafka インスタンスへデータを移行できます。一度限りの完全移行を実行することも、移行中に継続的な増分変更キャプチャで Kafka を同期状態に保つことも可能です。

仕組み

DTS は、ソース PolarDB-X インスタンスのバイナリログ (binlog) を読み取り、コミットされた行レベルの INSERT、UPDATE、DELETE 操作をキャプチャします。完全移行と増分移行を組み合わせる場合、DTS はまずソースデータの整合性のあるスナップショットを取得し、その後、スナップショットが完了した時点からバイナリログベースの増分キャプチャに切り替えます。

MySQL はスケジュールに基づいてバイナリログをパージするため、ソースインスタンスは、スナップショットフェーズ全体とダウンタイムまたは中断をカバーするのに十分な期間、バイナリログを保持する必要があります。DTS がバイナリログを読み取る前にパージされた場合、タスクは失敗するか、データ損失が発生します。

DTS がサポートする機能

機能詳細
移行タイプスキーマ移行、完全なデータ移行、増分データ移行
増分 DML 操作INSERT、UPDATE、DELETE
Kafka データ形式DTS Avro、Canal Json
Kafka 認証非暗号化、SCRAM-SHA-256
パーティションルーティングKafka パーティションへのデータルーティングのための設定可能なポリシー
オブジェクト名マッピング宛先でのテーブルまたは列の名前変更
データフィルタリングWHERE 条件に基づく行フィルタリング
ETL移行中の抽出、変換、書き出し (ETL) 処理
DTS は外部キーを移行しません。ソースデータベースで定義されたカスケード操作と削除操作は Kafka にレプリケートされません。

前提条件

開始する前に、以下を確認してください。

  • MySQL 5.7 と互換性のある PolarDB-X 2.0 インスタンス

  • 移行されたデータを受信する宛先 Message Queue for Apache Kafka インスタンスに作成されたトピック。詳細については、「ステップ 1: トピックの作成」をご参照ください。

  • ソースと宛先の互換性のあるバージョン。詳細については、「データ移行シナリオの概要」をご参照ください。

  • ソースデータを保持するための宛先 Kafka インスタンスの十分なストレージスペース

ソースデータベースアカウントの権限:

移行タイプ必要な権限
スキーマ移行SELECT
完全なデータ移行SELECT
増分データ移行移行するオブジェクトに対する REPLICATION SLAVE、REPLICATION CLIENT、および SELECT

権限の付与についての手順については、「PolarDB-X のデータ同期ツール」をご参照ください。

バイナリログの構成 (増分データ移行に必要):

ソース PolarDB-X インスタンスでバイナリロギングを有効にし、次のパラメーターを設定します。

binlog_row_image=full

移行タイプに基づいて保持期間を設定します。

移行タイプ最小保持期間
増分データ移行のみ24 時間以上
完全なデータ移行 + 増分データ移行7 日以上
警告

DTS がバイナリログを読み取る前にパージされた場合、移行タスクは失敗します。大規模なソースデータベースの場合、完全なデータ移行 (スナップショット) フェーズは保持期間よりも長くかかることがあります。スナップショット中にバイナリログがパージされた場合、DTS は正しいバイナリログの位置から増分移行を再開できず、データの不整合またはデータ損失が発生します。完全なデータ移行が完了した後、保持期間を 24 時間以上に短縮できます。

制限事項

ソースデータベースの制限:

  • ソースデータベースをホストするサーバーには、十分なアウトバウンド帯域幅が必要です。帯域幅が不足すると、移行速度が低下します。

  • 移行するテーブルには、すべてのフィールドが一意であるプライマリキーまたは一意制約が必要です。これらの制約がない場合、宛先に重複レコードが含まれる可能性があります。

  • テーブルを移行オブジェクトとして選択し、宛先でテーブルまたは列の名前を変更する必要がある場合、単一の移行タスクは最大 1,000 テーブルをサポートします。1,000 を超えるテーブルの場合は、複数のタスクを構成するか、代わりにデータベース全体を移行してください。

  • スキーマ移行および完全なデータ移行中は、ソースデータベースで DDL 操作を実行しないでください。移行中の DDL 変更は、タスクの失敗を引き起こします。

  • 移行中に PolarDB-X インスタンスのネットワークタイプを変更する場合は、それに応じて移行タスクのネットワーク接続設定を更新してください。

  • 増分移行を行わない完全なデータ移行の場合:移行中にソースデータベースへの書き込みを行わないでください。この期間中の書き込みにより、ソースと送信先の間にデータの不整合が発生します。一貫性を確保するには、[スキーマ移行][完全なデータ移行]、および[増分データ移行]を同時に選択してください。

その他の制限:

  • 移行を開始する前に、パフォーマンスへの影響を評価してください。可能な場合は、オフピーク時間に移行してください。完全なデータ移行は、ソースと宛先の両方で読み取りおよび書き込みリソースを消費します。

  • 完全なデータ移行中、同時 INSERT 操作は宛先テーブルで断片化を引き起こします。移行後、宛先で使用される表領域はソースよりも大きくなります。

  • DTS は、失敗したタスクの再開を最大 7 日間試みます。ワークロードを宛先に切り替える前に、失敗したタスクを停止またはリリースしてください。または、REVOKE ステートメントを実行して、DTS アカウントから書き込み権限を取り消し、失敗したタスクが再開したときにソースデータが宛先データを上書きするのを防ぎます。

注意事項:

DTS は、バイナリログの位置を進めるために、スケジュールに基づいてソースデータベースの dts_health_check.ha_health_check テーブルを更新します。

課金

移行タイプインスタンス構成料金インターネットトラフィック料金
スキーマ移行と完全なデータ移行無料Alibaba Cloud からインターネット経由でデータが移行された場合にのみ課金されます。詳細については、「課金概要
増分データ移行課金済み。「課金概要

移行タスクの設定

ステップ 1: データ移行タスクページへの移動

  1. Data Management (DMS) コンソールにログインします。

  2. トップナビゲーションバーで、[DTS] をクリックします。

  3. 左側のナビゲーションウィンドウで、[DTS (DTS)][データ移行] を選択します。

コンソールのレイアウトと利用可能なオプションは、DMS モードに基づいて異なる場合があります。詳細については、「シンプルモード」および「DMS コンソールのレイアウトとスタイルをカスタマイズする」をご参照ください。新しい DTS コンソールのデータ移行タスクページに直接アクセスすることもできます。

ステップ 2: ソースデータベースと宛先データベースの構成

  1. [データ移行タスク] の横にあるドロップダウンリストから、移行インスタンスが存在するリージョンを選択します。

    新しい DTS コンソールでは、左上隅でリージョンを選択します。
  2. [タスクの作成] をクリックします。[タスクの作成] ウィザードで、ソースおよびターゲットデータベースを設定します。

    警告

    ソースデータベースおよびターゲットデータベースを設定した後は、続行する前にページ上部に表示される[制限事項]を確認してください。この手順をスキップすると、タスクの失敗またはデータの不整合が発生する可能性があります。

    ソースデータベースのパラメーター:

    パラメーター説明
    タスク名タスクの説明的な名前。DTS はデフォルト名を割り当てます。一意の名前は必須ではありません。
    既存の DMS データベースインスタンスを選択する既存のインスタンスを選択して DTS にパラメーターを自動的に入力させるか、手動で構成します。
    データベースタイプ[PolarDB-X 2.0] を選択
    アクセス方法[Alibaba Cloud インスタンス] を選択します。
    インスタンスリージョンソース PolarDB-X インスタンスが存在するリージョン。
    インスタンス IDソース PolarDB-X インスタンスの ID。
    データベースアカウントソースインスタンスのアカウント。権限テーブルについては、「前提条件」をご参照ください。
    データベースパスワードデータベースアカウントのパスワード。

    宛先データベースのパラメーター:

    パラメーター説明
    既存の DMS データベースインスタンスの選択DTS がパラメーターを自動的に設定するため、既存のインスタンスを選択するか、手動で設定してください。
    データベースタイプKafka を選択します。
    アクセス方法Express Connect、VPN Gateway、または Smart Access Gateway を選択します。DTS は Message Queue for Apache Kafka を直接アクセス可能な方法としてリスト表示しません。そのため、Message Queue for Apache Kafka インスタンスには、自己管理 Kafka クラスターとして接続してください。
    インスタンスリージョン送信先の Message Queue for Apache Kafka インスタンスが配置されているリージョンです。
    接続済み VPC設定情報インスタンス情報Kafka インスタンスに関連付けられた仮想プライベートクラウド (VPC) の ID です。VPC ID を取得するには:Message Queue for Apache Kafka コンソールにログインし、[インスタンスの詳細] ページを開き、[インスタンス情報] タブの [構成情報] セクションで VPC ID を確認します。
    IP アドレスまたはドメイン名デフォルトエンドポイント[エンドポイント情報]Kafka インスタンスの IP アドレスです。IP アドレスを取得するには:[インスタンスの詳細] ページで、[エンドポイント情報] セクションの [デフォルトエンドポイント] フィールドから IP アドレスを確認します。
    ポート番号Kafka インスタンスのサービスポートです。デフォルト値:9092。
    データベースアカウントKafka インスタンスのアカウントです。VPC タイプのインスタンスには不要です。
    データベースパスワードデータベースアカウントのパスワードです。VPC タイプのインスタンスには不要です。
    Kafka バージョン送信先 Kafka インスタンスのバージョンです。
    暗号化セキュリティ要件に応じて、非暗号化 または SCRAM-SHA-256 を選択します。
    トピック移行データを受信するトピックです。ドロップダウンリストから選択します。
    DDL 情報を格納するトピックDDL 情報を格納するトピックです。空欄のままにした場合、DDL 情報は [トピック] で指定されたトピックに格納されます。
    Kafka Schema Registry の使用Avro スキーマの保存および取得を可能にする RESTful API を提供する Kafka Schema Registry を使用するかどうかを指定します。いいえ を選択するとスキップされ、はい を選択した場合は、Avro スキーマ用に Schema Registry に登録された URL または IP アドレスを入力します。
  3. [接続性のテストと続行]」をクリックします。DTS は、自動的にその CIDR ブロックを Alibaba Cloud データベースインスタンス(ApsaraDB RDS for MySQL や ApsaraDB for MongoDB など)の IP アドレスホワイトリストおよび、自己管理データベースをホストする Elastic Compute Service (ECS) インスタンスのセキュリティグループルールに追加します。複数の ECS インスタンスで実行される自己管理データベースの場合、各インスタンスのセキュリティグループルールに DTS の CIDR ブロックを手動で追加します。オンプレミスデータベースまたはサードパーティクラウドデータベースの場合、データベースの IP アドレスホワイトリストに DTS の CIDR ブロックを手動で追加します。詳細については、「オンプレミスデータベースのセキュリティ設定への DTS サーバーの CIDR ブロックの追加」をご参照ください。

    警告

    DTS CIDR ブロックをホワイトリストまたはセキュリティグループルールに追加すると、潜在的なセキュリティリスクを導入します。DTS を使用する前に、予防措置を講じてください。アカウントとパスワードのセキュリティを強化し、公開ポートを制限し、API 呼び出しを認証し、IP ホワイトリストと ECS セキュリティグループルールを定期的に監査し、不正な CIDR ブロックを削除してください。より強力な隔離のために、Express Connect、VPN Gateway、または Smart Access Gateway を使用してデータベースを DTS に接続します。

ステップ 3: 移行オブジェクトと設定の構成

パラメーター説明
移行タイプ一度限りの移行を行う場合は、スキーマ移行フルデータ移行 を選択します。アプリケーションを中断せずに移行中に送信先を同期し続けるには、スキーマ移行フルデータ移行、および 増分データ移行 を選択します。増分データ移行 を選択しない場合、データの不整合を防ぐため、移行中はソースデータベースへの書き込みを行わないでください。
競合テーブルの処理モード事前チェックしてエラーを報告:ソースと送信先の両方に同名のテーブルが存在する場合、事前チェックが失敗します。競合を解消する前に、オブジェクト名マッピング を使用して送信先のテーブル名を変更してください。エラーを無視して続行:競合チェックをスキップします。スキーマが一致する場合、重複するプライマリキーを持つレコードは移行されません。スキーマが異なる場合、特定の列のみが移行されるか、タスクが失敗します。慎重に続行してください。
Kafka 内のデータフォーマットDTS Avro:DTS Avro スキーマ定義を使用してデータを解析します。詳細については、GitHub 上のスキーマ定義をご参照ください。Canal Json:データは Canal Json フォーマットで保存されます。詳細については、「Kafka クラスターのデータフォーマット」の「Canal Json」セクションをご参照ください。
Kafka パーティションへのデータ送信ポリシー要件に基づいてパーティションルーティングポリシーを選択します。詳細については、「Kafka パーティションへのデータ移行ポリシーの指定」をご参照ください。ソースが PolarDB-X 1.0 インスタンスの場合、このオプションは利用できません。
ソースオブジェクト1 つ以上のオブジェクトを選択し、Rightwards arrow アイコンをクリックして 選択済みオブジェクト に追加します。列、テーブル、またはスキーマを選択できます。テーブルまたは列を選択した場合、DTS はビュー、トリガー、ストアドプロシージャを移行しません。
選択済みオブジェクト送信先で単一のオブジェクトの名前を変更するには、そのオブジェクトを右クリックします。詳細については、「単一オブジェクトの名前マッピング」をご参照ください。複数のオブジェクトの名前を一度に変更するには、一括編集 をクリックします。詳細については、「複数のオブジェクト名を一度にマッピング」をご参照ください。オブジェクトの名前を変更すると、それに依存する他のオブジェクトが機能しなくなる可能性があります。行をフィルターするには、オブジェクトを右クリックして WHERE 条件を指定します。詳細については、「フィルター条件の設定」をご参照ください。テーブルに対して移行する特定の SQL 操作を選択するには、オブジェクトを右クリックして該当する操作を選択します。

ステップ 4: 詳細設定の構成

[次へ: 詳細設定] をクリックし、次のパラメーターを設定します。

パラメーター説明
アラートの設定タスクの失敗または移行遅延がしきい値を超えた場合に通知を受信するには、[はい]モニタリングとアラートの設定 を選択します。アラートのしきい値および連絡先を指定してください。詳細については、「」をご参照ください。アラート機能をスキップする場合は、[いいえ] を選択します。
宛先インスタンスにおけるオブジェクト名の大文字小文字の制御宛先におけるデータベース名、テーブル名、カラム名の大文字小文字の形式を制御します。デフォルト値は [DTS デフォルトポリシー] です。詳細については、「宛先インスタンスにおけるオブジェクト名の大文字小文字の指定」をご参照ください。
接続失敗時の再試行時間タスク開始後に DTS が接続失敗を再試行する期間です。有効な値:10~1440 分。デフォルト値:720 分。この値は 30 分より長く設定してください。この期間内に DTS が再接続できた場合、タスクは自動的に再開されます。それ以外の場合はタスクが失敗します。複数のタスクが同一のソースまたは宛先を共有する場合、その中で最も短い再試行時間が優先されます。再試行中の間も DTS インスタンスに対して課金が発生します。そのため、ご要件に応じて再試行ウィンドウを適切に設定し、ソースおよび宛先の廃止後は速やかに DTS インスタンスをリリースしてください。
ETL の設定抽出・変換・書き出し(ETL)処理を有効化し、データ処理文を入力するには、[はい]ETL とはデータ移行またはデータ同期タスクにおける ETL の設定 を選択します。詳細については、「」および「」をご参照ください。ETL をスキップする場合は、[いいえ] を選択します。
順方向および逆方向タスクのハートビートテーブルに対する SQL 操作の削除有無ハートビートテーブルへの書き込みを抑制するには、[はい] を選択します。この場合、DTS インスタンス上で移行遅延が表示されることがあります。ハートビートテーブルへの DTS の書き込みを許可するには、[いいえ] を選択します。ただし、物理バックアップやクローン作成などの一部のソースデータベース機能に影響が出る可能性があります。

ステップ 5: 事前チェックの実行とインスタンスの購入

  1. [次へ: タスク設定の保存と事前チェック] をクリックします。このタスク構成の API パラメーターをプレビューするには、[次へ: タスク設定の保存と事前チェック] にマウスを上に移動して、[OpenAPI パラメーターのプレビュー] をクリックします。

    DTS は、移行を開始する前に事前チェックを実行します。タスクは、事前チェックに合格した後にのみ開始されます。事前チェックが失敗した場合、失敗した各項目の横にある [詳細の表示] をクリックし、報告された問題を解決してから、[再度事前チェック] をクリックします。アラートがトリガーされ、無視できる場合は、[アラート詳細の確認] をクリックし、[詳細の表示] ダイアログで [無視] をクリックし、[OK] をクリックしてから、[再度事前チェック] をクリックします。アラートを無視すると、データの不整合が発生する可能性があります。
  2. 成功率が 100% に達するまで待ち、次に [Next: Purchase Instance] をクリックします。

  3. インスタンスの購入」ページで、インスタンスクラスを設定します。

    パラメーター説明
    リソースグループ移行インスタンスのリソースグループ。デフォルト: デフォルト リソースグループ。詳細については、「Resource Management とは何か?
    インスタンスクラスインスタンスクラスは移行速度を決定します。ワークロードに基づいて選択してください。詳細については、「データ移行インスタンスの仕様」をご参照ください。
  4. [Data Transmission Service (Pay-as-you-go) Service Terms]」チェックボックスを読み取り、選択してください。

  5. [Buy and Start] をクリックします。タスクがタスク リストに表示され、実行を開始します。

次のステップ

移行タスクが実行されたら、タスクリストでその進捗をモニターします。タスクが安定し、宛先 Kafka インスタンスがデータを正しく消費していることを確認したら、ダウンストリームアプリケーションを宛先トピックから読み取るように切り替えます。切り替える前に、失敗したタスクを停止またはリリースして、DTS が再開時に宛先データを上書きするのを防ぎます。