すべてのプロダクト
Search
ドキュメントセンター

Data Transmission Service:PolarDB for MySQLクラスターのデータをApache KafkaインスタンスのMessage Queueに同期する

最終更新日:Oct 31, 2024

このトピックでは、data Transmission Service (DTS) を使用して、PolarDB for MySQLクラスターからApsaraMQ for Kafkaインスタンスにデータを同期する方法について説明します。 これにより、メッセージの管理機能が向上します。

前提条件

制限事項

カテゴリ

説明

ソースデータベースの制限

  • 同期するテーブルには、PRIMARY KEYまたはUNIQUE制約が必要であり、すべてのフィールドが一意である必要があります。 そうでない場合、宛先データベースは重複するデータレコードを含み得る。

  • 同期するオブジェクトとしてテーブルを選択し、ターゲットデータベースのテーブルや列の名前の変更など、テーブルを変更する必要がある場合は、1つのデータ同期タスクで最大1,000のテーブルを同期できます。 タスクを実行して1,000を超えるテーブルを同期すると、リクエストエラーが発生します。 この場合、複数のタスクを構成してテーブルを同期するか、タスクを構成してデータベース全体を同期することをお勧めします。

  • 増分データを同期する必要がある場合は、バイナリログ機能を有効にし、loose_polar_log_binパラメーターをonに設定する必要があります。 それ以外の場合、事前チェック中にエラーメッセージが返され、データ同期タスクを開始できません。 詳細については、「バイナリログの有効化」および「パラメーターの変更」をご参照ください。

    説明
    • PolarDB for MySQLクラスターのバイナリログ機能を有効にすると、バイナリログによって占有されているストレージ容量に対して課金されます。

    • 増分データ同期タスクの場合、ソースデータベースのバイナリログは少なくとも24時間保持されます。 完全および増分データ同期タスクの場合、ソースデータベースのバイナリログは少なくとも7日間保持されます。 そうしないと、DTSはバイナリログの取得に失敗し、タスクが失敗する可能性があります。 例外的な状況では、データの不整合または損失が発生します。 完全なデータ同期が完了したら、保持期間を24時間以上に設定できます。 上記の要件に基づいて、バイナリログの保持期間を設定してください。 そうしないと、DTSのサービスレベル契約 (SLA) に記載されているサービスの信頼性またはパフォーマンスが保証されない場合があります。

  • スキーマ同期および完全データ同期中は、DDL文を実行してデータベースまたはテーブルのスキーマを変更しないでください。 それ以外の場合、データ同期タスクは失敗します。

その他の制限

  • DTSは、ソースPolarDB for MySQLクラスターの読み取り専用ノードを同期しません。

  • DTSは、ソースPolarDB for MySQLクラスターからObject Storage Service (OSS) 外部テーブルを同期しません。

  • データを同期する前に、ソースデータベースとターゲットデータベースのパフォーマンスに対するデータ同期の影響を評価します。 オフピーク時にデータを同期することを推奨します。 最初の完全データ同期中、DTSはソースデータベースとターゲットデータベースの読み取りおよび書き込みリソースを使用します。 これにより、データベースサーバーの負荷が増加する可能性があります。

  • 初期の完全データ同期中に、同時INSERT操作により、ターゲットデータベースのテーブルが断片化されます。 最初の完全データ同期が完了した後、ターゲットデータベースの使用表領域のサイズは、ソースデータベースのサイズよりも大きくなります。

  • データ同期中にソーステーブルでDDL操作を実行するためにpt-online-schema-changeなどのツールを使用しないことをお勧めします。 それ以外の場合、データ同期タスクは失敗します。

  • データ同期中に他のソースからのデータがターゲットデータベースに書き込まれない場合は、data Management (DMS) を使用して、ソーステーブルに対してオンラインDDL操作を実行できます。 詳細については、「ロックフリーDDL操作の実行」をご参照ください。

  • データ同期中に他のソースからのデータがターゲットデータベースに書き込まれると、ソースデータベースとターゲットデータベース間のデータの不一致が発生します。 たとえば、他のソースからのデータがターゲットデータベースに書き込まれているときにDMSを使用してオンラインDDLステートメントを実行すると、ターゲットデータベースでデータが失われる可能性があります。

  • データ同期中に、ターゲットKafkaデータベースがスケーリングされている場合は、データベースを再起動する必要があります。

特別なケース

DTSは、CREATE DATABASE IF NOT EXISTS 'test' ステートメントをソースデータベースで実行し、バイナリログファイルの位置を前に移動します。

単一レコードのサイズ制限

Kafkaに書き込むことができる1つのレコードの最大サイズは10 MBです。 したがって、ソースデータの行のサイズが10 MBを超えると、DTSがKafkaにレコードを書き込むことができないため、関連するDTSタスクが中断されます。 このシナリオでは、大きなフィールドを含むテーブル全体を同期するのではなく、テーブルの一部のフィールドのみを同期することをお勧めします。 DTSタスクを設定するときは、これらの大きなフィールドのレコードを除外する必要があります。 大きなフィールドを含むテーブルがタスクのオブジェクトに含まれている場合は、テーブルを削除し、オブジェクトにテーブルを再度追加してから、大きなフィールドを除外するようにフィルター条件を設定する必要があります。

サポートしている同期トポロジ

  • 一方向の 1 対 1 の同期

  • 一方向の 1 対多の同期

  • 一方向の多対 1 の同期

  • 一方向カスケード同期

DTSでサポートされている同期トポロジの詳細については、「同期トポロジ」をご参照ください。

同期可能なSQL操作

操作タイプ

SQL文

DML

挿入、更新、および削除

DDL

  • CREATE TABLE、ALTER TABLE、DROP TABLE、RENAME TABLE、およびTRUNCATE TABLE

  • ビュー、ALTER VIEW、およびDROP VIEWの作成

  • CREATE PROCEDURE、ALTER PROCEDURE、およびDROP PROCEDURE

  • CREATE FUNCTION、DROP FUNCTION、CREATE TRIGGER、およびDROP TRIGGER

  • CREATE INDEXとDROP INDEX

データベースアカウントに必要な権限

データベース

必要な権限

PolarDB for MySQLクラスターのソース

同期するオブジェクトの読み取り権限

手順

  1. [データ同期タスク] ページに移動します。

    1. データ管理 (DMS) コンソール にログインします。

    2. 上部のナビゲーションバーで、DTSをクリックします。

    3. 左側のナビゲーションウィンドウで、DTS (DTS) > データ同期を選択します。

    説明
  2. データ同期タスクの右側で、データ同期インスタンスが存在するリージョンを選択します。

    説明

    新しいDTSコンソールを使用する場合は、上部のナビゲーションバーでデータ同期インスタンスが存在するリージョンを選択する必要があります。

  3. [タスクの作成] をクリックします。 [データ同期タスクの作成] ページで、ソースデータベースとターゲットデータベースを設定します。 次の表にパラメーターを示します。

    警告

    ソースデータベースとターゲットデータベースを設定した後、ページに表示される制限を読むことを推奨します。 そうしないと、タスクが失敗したり、データの不一致が発生します。

    セクション

    パラメーター

    説明

    非該当

    タスク名

    DTSタスクの名前。 タスク名は自動生成されます。 タスクを簡単に識別できるように、わかりやすい名前を指定することをお勧めします。 一意のタスク名を指定する必要はありません。

    ソースデータベース

    既存のDMSデータベースインスタンスの選択

    使用するデータベースインスタンス。 ビジネス要件に基づいて、既存のインスタンスを使用するかどうかを選択できます。

    • 既存のインスタンスを選択すると、DTSはデータベースのパラメーターを自動的に入力します。

    • 既存のインスタンスを選択しない場合は、次のデータベース情報を設定する必要があります。

    データベースタイプ

    移行元ディスクのタイプを設定します。 [PolarDB for MySQL] を選択します。

    アクセス方法

    ソースデータベースのアクセス方法。 [Alibaba Cloudインスタンス] を選択します。

    インスタンスリージョン

    ソースPolarDB for MySQLクラスターが存在するリージョン。

    Alibaba Cloudアカウント全体でのデータの複製

    Alibaba Cloudアカウント間でデータを同期するかどうかを指定します。 この例では、[いいえ] が選択されています。

    PolarDBクラスターID

    ソースPolarDB for MySQLクラスターのID。

    データベースアカウント

    ソースPolarDB for MySQLクラスターのデータベースアカウント。 アカウントに必要な権限の詳細については、「データベースアカウントに必要な権限」をご参照ください。

    データベースパスワード

    データベースインスタンスへのアクセスに使用されるパスワード。

    暗号化

    データベースへの接続を暗号化するかどうかを指定します。 ビジネス要件に基づいて、[非暗号化] または [SSL暗号化] を選択できます。 このパラメーターをSSL暗号化に設定する場合、DTSタスクを設定する前に、ApsaraDB RDS for MySQLインスタンスのSSL暗号化を有効にする必要があります。 詳細については、「クラウド証明書を使用したSSL暗号化の有効化」をご参照ください。

    宛先データベース

    既存のDMSデータベースインスタンスの選択

    使用するデータベースインスタンス。 ビジネス要件に基づいて、既存のインスタンスを使用するかどうかを選択できます。

    • 既存のインスタンスを選択すると、DTSはデータベースのパラメーターを自動的に入力します。

    • 既存のインスタンスを選択しない場合は、次のデータベース情報を設定する必要があります。

    データベースタイプ

    ターゲットデータベースのタイプ。 Kafkaを選択します。

    アクセス方法

    ターゲットデータベースのアクセス方法。 Express Connect、VPN Gateway、またはSmart Access Gatewayを選択します。

    説明

    インスタンスタイプとしてApsaraMQ for Kafkaを選択することはできません。 Apache KafkaのMessage Queueを自己管理Kafkaインスタンスとして使用して、データ同期を設定できます。

    インスタンスリージョン

    ターゲットApsaraMQ for Kafkaインスタンスが存在するリージョン。

    接続済みVPC

    ApsaraMQ for Kafkaインスタンスが属する仮想プライベートクラウド (VPC) のID。 VPC IDを取得するには、次の操作を実行します。Message Queue for Apache Kafkaコンソールにログインし、Message Queue for Apache Kafkaインスタンスのインスタンス詳細ページに移動します。 [設定情報] セクションで、VPC IDを表示します。

    IPアドレス

    ApsaraMQ for KafkaインスタンスのDefault Endpointパラメーターに含まれるIPアドレスを入力します。

    説明

    IPアドレスを取得するには、次の操作を実行します。Message Queue for Apache Kafkaコンソールにログインし、ApsaraMQ for Kafkaインスタンスの [インスタンスの詳細] ページに移動します。 [エンドポイント情報] セクションで、[デフォルトのエンドポイント] パラメーターからIPアドレスを取得します。

    ポート番号

    宛先ApsaraMQ for Kafkaインスタンスのサービスポート番号。 デフォルトのポート番号は9092です。

    データベースアカウント

    移行先ApsaraMQ for Kafkaインスタンスのデータベースアカウント。

    説明

    ApsaraMQ for KafkaインスタンスのインスタンスタイプがVPCインスタンスの場合、データベースアカウントまたはデータベースパスワードを指定する必要はありません。

    データベースパスワード

    データベースインスタンスへのアクセスに使用されるパスワード。

    Kafkaバージョン

    Apache Kafkaインスタンスの宛先Message Queueのバージョン。

    暗号化

    接続を暗号化するかどうかを指定します。 ビジネスとセキュリティの要件に基づいて、[非暗号化] または [SCRAM-SHA 256] を選択します。

    トピック

    同期データの受信に使用されるトピック。 ドロップダウンリストからトピックを選択します。

    DDL情報を保存するトピック

    DDL情報の格納に使用されるトピック。 ドロップダウンリストからトピックを選択します。 このパラメーターを指定しない場合、DDL情報はtopicパラメーターで指定されたトピックに格納されます。

    Kafkaスキーマレジストリの使用

    メタデータのサービングレイヤーを提供するKafka Schema Registryを使用するかどうかを指定します。 Avroスキーマを保存および取得するためのRESTful APIを提供します。 有効な値:

    • いいえ: Kafka Schema Registryを使用しません。

    • はい: Kafka Schema Registryを使用します。 この場合、AvroスキーマのKafka Schema Registryに登録されているURLまたはIPアドレスを入力する必要があります。

  4. 自己管理データベースにホワイトリストが設定されている場合は、DTSサーバーのCIDRブロックをホワイトリストに追加します。 次に、[テスト接続と続行] をクリックします。

    説明

    DTSサーバーのCIDRブロックの詳細については、「DTSサーバーのCIDRブロックの追加」をご参照ください。

  5. 同期するオブジェクトと詳細設定を設定します。

    パラメーター

    説明

    同期タイプ

    同期タイプ。 デフォルトでは、増分データ同期が選択されています。 [スキーマ同期] および [完全データ同期] も選択する必要があります。 事前チェックが完了すると、DTSは選択したオブジェクトの履歴データをソースデータベースからターゲットクラスターに同期します。 履歴データは、その後の増分同期の基礎となる。

    競合テーブルの処理モード

    • エラーの事前チェックと報告: ターゲットデータベースに、ソースデータベースのテーブルと同じ名前のテーブルが含まれているかどうかを確認します。 ソースデータベースとターゲットデータベースに同じテーブル名のテーブルが含まれていない場合は、事前チェックに合格します。 それ以外の場合、事前チェック中にエラーが返され、データ同期タスクを開始できません。

      説明

      ソースデータベースとターゲットデータベースに同じ名前のテーブルが含まれていて、ターゲットデータベース内のテーブルを削除または名前変更できない場合は、オブジェクト名マッピング機能を使用して、ターゲットデータベースに同期されるテーブルの名前を変更できます。 詳細については、「マップオブジェクト名」をご参照ください。

    • エラーを無視して続行: ソースデータベースとターゲットデータベースの同じテーブル名の事前チェックをスキップします。

      警告

      エラーを無視して続行 を選択すると、データの不整合が発生し、ビジネスが潜在的なリスクにさらされる可能性があります。

      • ソースデータベースとターゲットデータベースが同じスキーマを持ち、ターゲットデータベースのデータレコードがソースデータベースのデータレコードと同じ主キー値または一意キー値を持つ場合:

        • 完全データ同期中、DTSはデータレコードをターゲットデータベースに同期しません。 ターゲットデータベースの既存のデータレコードが保持されます。

        • 増分データ同期中、DTSはデータレコードをターゲットデータベースに同期します。 ターゲットデータベースの既存のデータレコードが上書きされます。

      • ソースデータベースとターゲットデータベースのスキーマが異なる場合、データの初期化に失敗する可能性があります。 この場合、一部の列のみが同期されるか、データ同期タスクが失敗します。 作業は慎重に行ってください。

    Kafkaのデータ形式

    Message Queue for Apache Kafkaインスタンスにデータが格納される形式。

    • DTS Avroを選択した場合、データはDTS Avroのスキーマ定義に基づいて解析されます。 詳細については、『GitHub』をご参照ください。

    • Canal Jsonを選択した場合、データはCanal JSON形式で保存されます。 関連するパラメーターと例の詳細については、「Kafkaクラスターのデータ形式」トピックの「Canal JSON」セクションをご参照ください。

    Kafkaパーティションへの出荷データのポリシー

    ビジネス要件に基づいて、Kafkaパーティションに同期されるデータの同期ポリシーを選択します。 詳細については、「データをKafkaパーティションに移行するためのポリシーの指定」をご参照ください。

    宛先インスタンスでのオブジェクト名の大文字化

    ターゲットインスタンスのデータベース名、テーブル名、および列名の大文字化。 デフォルトでは、DTSデフォルトポリシーが選択されています。 他のオプションを選択して、オブジェクト名の大文字化をソースまたはターゲットデータベースの大文字化と一致させることができます。 詳細については、「ターゲットインスタンスのオブジェクト名の大文字化の指定」をご参照ください。

    ソースオブジェクト

    ソースオブジェクト セクションから1つ以上のオブジェクトを選択し、向右アイコンをクリックして 選択中のオブジェクト セクションにオブジェクトを追加します。

    説明

    同期するオブジェクトとしてテーブルのみを選択できます。

    [選択済みオブジェクト]

    • 同期先のインスタンスに同期するオブジェクトの名前を変更するには、選択中のオブジェクト セクションでオブジェクトを右クリックします。 詳細については、「オブジェクト名のマップ」トピックの「単一オブジェクトの名前のマップ」セクションをご参照ください。

    • 一度に複数のオブジェクトの名前を変更するには、選択中のオブジェクト セクションの右上隅にある 一括編集 をクリックします。 詳細については、「オブジェクト名のマップ」トピックの「一度に複数のオブジェクト名をマップする」セクションをご参照ください。

    説明
    • 特定のデータベースまたはテーブルで実行されるSQL操作を選択するには、次の手順を実行します。[選択されたオブジェクト] セクションで、オブジェクトを右クリックします。 表示されるダイアログボックスで、同期するSQL操作を選択します。 同期できるSQL操作の詳細については、「同期できるSQL操作」をご参照ください。

    • データをフィルタリングするWHERE条件を指定するには、[選択済みオブジェクト] セクションでオブジェクトを右クリックします。 表示されるダイアログボックスで、条件を指定します。 条件の指定方法の詳細については、「フィルター条件の指定」をご参照ください。

  6. 次へ:詳細設定クリックして詳細設定を設定します。

    パラメーター

    説明

    Set Alerts

    データ同期タスクのアラートを設定するかどうかを指定します。 タスクが失敗するか、同期レイテンシが指定されたしきい値を超えると、アラート送信先は通知を受け取ります。 有効な値:

    失敗した接続のリトライ時間範囲の指定

    失敗した接続のリトライ時間範囲。 データ同期タスクの開始後にソースデータベースまたはターゲットデータベースの接続に失敗した場合、DTSはその時間範囲内ですぐに接続を再試行します。 有効な値: 10 ~ 1440 単位は分です。 デフォルト値: 720 このパラメーターを30より大きい値に設定することを推奨します。 DTSが指定された時間範囲内にソースデータベースとターゲットデータベースに再接続すると、DTSはデータ同期タスクを再開します。 それ以外の場合、データ同期タスクは失敗します。

    説明
    • ソースまたはターゲットデータベースが同じである複数のデータ同期タスクに対して異なるリトライ時間範囲を指定した場合、最も短いリトライ時間範囲が優先されます。

    • DTSが接続を再試行すると、DTSインスタンスに対して課金されます。 業務要件に基づいて再試行時間範囲を指定することを推奨します。 ソースインスタンスとターゲットインスタンスがリリースされた後、できるだけ早くDTSインスタンスをリリースすることもできます。

    ETLの設定

    抽出、変換、および読み込み (ETL) 機能を有効にするかどうかを指定します。 詳細については、「ETLとは何ですか?」をご参照ください。 有効な値:

    順方向および逆方向タスクのハートビートテーブル sql を削除

    DTSインスタンスの実行中に、ハートビートテーブルのSQL操作をソースデータベースに書き込むかどうかを指定します。 有効な値:

    • Yes: ハートビートテーブルにSQL操作を書き込みません。 この場合、DTSインスタンスのレイテンシが表示され得る。

    • No: ハートビートテーブルにSQL操作を書き込みます。 この場合、ソースデータベースの物理バックアップやクローニングなどの機能が影響を受ける可能性があります。

  7. タスク設定を保存し、事前チェックを実行します。

    • 関連するAPI操作を呼び出してDTSタスクを設定するときに指定するパラメーターを表示するには、ポインターを 次:タスク設定の保存と事前チェック に移動し、OpenAPI パラメーターのプレビュー をクリックします。

    • パラメーターを表示または表示する必要がない場合は、ページ下部の 次:タスク設定の保存と事前チェック をクリックします。

    説明
    • データ同期タスクを開始する前に、DTSは事前チェックを実行します。 データ同期タスクは、タスクが事前チェックに合格した後にのみ開始できます。

    • データ同期タスクが事前チェックに失敗した場合は、失敗した各項目の横にある [詳細の表示] をクリックします。 チェック結果に基づいて原因を分析した後、問題のトラブルシューティングを行います。 次に、プレチェックを再実行します。

    • 事前チェック中にアイテムに対してアラートがトリガーされた場合:

      • アラートアイテムを無視できない場合は、失敗したアイテムの横にある [詳細の表示] をクリックして、問題のトラブルシューティングを行います。 次に、もう一度プレチェックを実行します。

      • アラート項目を無視できる場合は、[アラート詳細の確認] をクリックします。 [詳細の表示] ダイアログボックスで、[無視] をクリックします。 表示されたメッセージボックスで、[OK] をクリックします。 次に、[再度事前チェック] をクリックして、事前チェックを再度実行します。 アラート項目を無視すると、データの不整合が発生し、ビジネスが潜在的なリスクにさらされる可能性があります。

  8. 成功率100% になるまで待ちます。 次に、[次へ: インスタンスの購入] をクリックします。

  9. [インスタンスの購入] ページで、データ同期インスタンスの課金方法とインスタンスクラスのパラメーターを設定します。 下表に、各パラメーターを説明します。

    セクション

    パラメーター

    説明

    新しいインスタンスクラス

    インスタンスクラス

    DTSは、同期速度のパフォーマンスが異なる複数のインスタンスクラスを提供します。 ビジネスシナリオに基づいてインスタンスクラスを選択できます。 詳細については、「データ同期インスタンスのインスタンスクラス」をご参照ください。

  10. データ伝送サービス (従量課金) サービス規約を読んで選択します。

  11. [購入して開始] をクリックします。 表示されるダイアログボックスで、OK をクリックします。

    タスクリストでタスクの進行状況を確認できます。