すべてのプロダクト
Search
ドキュメントセンター

Data Transmission Service:AnalyticDB for PostgreSQL から Alibaba Cloud Message Queue for Kafka へのデータ同期

最終更新日:Dec 27, 2025

Data Transmission Service (DTS) は、AnalyticDB for PostgreSQL インスタンスから Alibaba Cloud Message Queue for Kafka へのデータ同期をサポートしています。

説明

この機能は招待プレビューです。この機能を使用するには、チケットを送信してください。

適用範囲

  • 同期されたデータを受信するために、ターゲットの Alibaba Cloud Message Queue for Kafka インスタンスに Topic が作成されている必要があります。詳細については、「手順1:Topic の作成」をご参照ください。

  • ターゲットの Alibaba Cloud Message Queue for Kafka インスタンスのストレージ領域は、ソースの AnalyticDB for PostgreSQL インスタンスが使用するストレージ領域よりも大きい必要があります。

注意事項

ソースデータベースの制限事項

タイプ

制限事項

同期前

  • 帯域幅要件:同期速度を確保するため、ソースデータベースサーバーのアウトバウンド帯域幅が 100 Mb/s 以上であることを確認してください。

  • カーネルバージョン:ソースの AnalyticDB for PostgreSQL インスタンスのカーネルバージョンは 7.2.1.4 以降である必要があります。

  • パラメーター設定

    • 論理レプリケーション:論理レプリケーションを有効にします。wal_level パラメーターを logical に設定します。

    • 高可用性設定:ソースインスタンスが High-availability Edition の場合、プライマリ/スタンバイのスイッチオーバー中に同期が中断されるのを防ぐため、hot_standbyhot_standby_feedbacksync_replication_slots パラメーターを on に設定します。

  • アカウント権限:同期アカウントには、同期対象オブジェクトに対する読み取り権限と REPLICATION 権限が必要です。この権限を付与するには、コマンド ALTER USER your_user WITH REPLICATION; を実行します。

  • 長時間トランザクションの影響:増分同期中に、ソースデータベースでコミットされていない長時間トランザクションがあると、先行書き込みログ (WAL) が蓄積され、ディスク領域を使い果たす可能性があります。

  • オブジェクトの制限

    • 命名規則:同期するデータベースの名前にハイフン (-) を含めることはできません (例:dts-testdata)。

    • プライマリキーまたは一意制約:同期するテーブルには、プライマリキーまたは一意制約が必要です。制約のあるフィールドの値は一意でなければなりません。そうでない場合、ターゲットデータベースでデータが重複する可能性があります。

    • パーティションテーブル:パーティションテーブルの構造は同期できません。パーティションテーブルが同期されると、そのパーティション情報は失われます。デフォルトでは、テーブルはターゲットデータベースに非パーティションテーブルとして作成されます。

    • タスクあたりのデータベース数:1 つの同期タスクで同期できるデータベースは 1 つだけです。複数のデータベースを同期するには、データベースごとに個別の同期タスクを作成してください。

    • タスク設定の規模:列名のマッピングなど、1 つのタスクで 5,000 を超えるテーブルを編集する場合、タスクをバッチで複数作成するか、データベース全体の同期を設定してください。これにより、タスク送信時のリクエストエラーを回避できます。

    • サポートされていないオブジェクトタイプ:DTS は、スキーマをまたいで継承するテーブル、一時テーブル、内部システムトリガー、一部の関数 (PROCEDURE および FUNCTION の C 言語関数と内部関数)、および拡張機能 (EXTENSION) の同期をサポートしていません。

    • サポートされているオブジェクトタイプ:DTS は、プライマリキー、一意制約、チェック制約、および一部のカスタムデータ型 (COMPOSITE、ENUM、RANGE) の同期をサポートしています。

同期中

  • DDL 操作の制限:スキーマ同期および完全同期フェーズ中は、データベースまたはテーブルの構造を変更する DDL 操作を実行しないでください。実行した場合、データ同期タスクは失敗します。

    説明

    完全同期フェーズ中、DTS はソースデータベースにクエリを実行します。これによりメタデータロックが作成され、ソースデータベースでの DDL 操作がブロックされる可能性があります。

  • DDL 同期のサポート:DTS は現在、ソースデータベースからの DDL 操作の同期をサポートしていません。DTS タスクの開始後にソースで作成された新しいテーブルとその後のデータ変更は、ターゲットに同期されません。必要に応じて、新しい同期タスクを作成してください。

  • 接続情報の変更:タスクの実行中に、AnalyticDB for PostgreSQL インスタンスのエンドポイントまたはゾーンを変更しないでください。変更した場合、同期タスクは失敗します。

ターゲットデータベースの制限事項

タイプ

制限事項

同期前

  • パフォーマンスへの影響評価:完全同期はソースデータベースとターゲットデータベースの両方で読み取りおよび書き込みリソースを消費し、データベースの負荷を増加させる可能性があります。CPU 負荷が 30% 未満の場合など、オフピーク時に同期タスクを実行してください。

  • 単一メッセージのサイズ:Kafka は単一メッセージのサイズを 10 MB に制限しています。ソースデータベースの 1 行のデータが変換後にこの制限を超えると、タスクは中断されます。タスクを設定する際に、ラージオブジェクトを含む列を除外してください。タスクがすでに実行中の場合は、同期オブジェクトを変更します。テーブルを削除してから再度追加し、ラージオブジェクトを含む列を除外して同期されないようにします。

同期中

  • ターゲットクラスターのスケーリング:タスクの実行中にターゲットの Kafka クラスターがスケールアウトまたはスケールインされた場合 (ブローカーノードの追加や削除など)、変更を有効にするには DTS 同期タスクを再起動する必要があります。

  • 外部からのデータ書き込み:データ整合性を確保するため、同期中に DTS タスクの外部からターゲットの Kafka クラスターにデータを書き込まないでください。データの不整合やタスクの失敗を引き起こす可能性があります。

  • ストレージ領域の増加:完全同期中、DTS は同時書き込み操作を実行するため、ターゲットでデータ断片化が発生する可能性があります。同期後、ターゲットで使用されるストレージ領域がソースよりも大きくなる場合があります。

  • タスク再起動の動作:完全同期と増分同期の両方を含むタスクでは、タスクを再起動すると完全同期フェーズが再実行される場合があります。

  • インスタンスの障害

    • 回復メカニズム:DTS ヘルプデスクは 8 時間以内にインスタンスの回復を試みます。回復中、インスタンスが再起動されたり、DTS タスクパラメーターが調整されたりする場合があります。ご利用のデータベースパラメーターは変更されません。

    • データ上書きのリスク:ビジネスをターゲットデータベースに切り替えた後、不要になった同期タスクは速やかに停止またはリリースしてください。これにより、自動回復によるターゲットデータベースの偶発的なデータ上書きを防ぎます。

サポートされている同期 SQL

INSERTUPDATEDELETE

サポートされている同期オブジェクト

  • 基本オブジェクトSCHEMATABLE

    説明

    PRIMARY KEYUNIQUE KEYDATATYPE (組み込みデータ型)、DEFAULT CONSTRAINT を含みます。

  • その他のオブジェクトVIEWINDEXPROCEDUREFUNCTIONRULESEQUENCEAGGREGATEOPERATORDOMAIN

課金

同期タイプ

タスク設定料金

スキーマ同期と完全データ同期

無料。

増分同期

課金対象です。詳細については、「課金の概要」をご参照ください。

操作手順

  1. ターゲットリージョンのデータ同期タスクリストページに移動します。次のいずれかの方法を使用できます。

    DTS コンソールから

    1. DTS コンソールにログインします。

    2. 左側のナビゲーションウィンドウで、データ同期 をクリックします。

    3. ページの左上隅で、同期インスタンスが配置されているリージョンを選択します。

    DMS コンソールから

    説明

    実際の操作は、DMS コンソールのモードとレイアウトによって異なる場合があります。詳細については、「シンプルモードコンソール」および「DMS コンソールのレイアウトとスタイルをカスタマイズする」をご参照ください。

    1. DMS コンソールにログインします。

    2. 上部のメニューバーから、[データ + AI] > [データ転送 (DTS)] > [データ同期] を選択します。

    3. データ同期タスク の右側で、同期インスタンスのリージョンを選択します。

  2. タスクの作成 をクリックして、タスク設定ページに移動します。

  3. ソースデータベースとターゲットデータベースを設定します。次の表にパラメーターを示します。

    カテゴリ

    設定

    説明

    なし

    タスク名

    DTS は自動的にタスク名を生成します。識別しやすいように、わかりやすい名前を指定することを推奨します。名前は一意である必要はありません。

    移行元データベース

    既存の接続情報の選択

    • システムに登録されているデータベースインスタンス (新規作成または保存されたもの) を使用するには、ドロップダウンリストから目的のデータベースインスタンスを選択します。以下のデータベース情報は自動的に設定されます。

      説明

      DMS コンソールでは、この設定項目は DMS データベースインスタンスの選択 です。

    • データベースインスタンスをシステムに登録していない場合、または登録済みのインスタンスを使用する必要がない場合は、以下のデータベース情報を手動で設定します。

    データベースタイプ

    AnalyticDB for PostgreSQL を選択します。

    アクセス方法

    Alibaba Cloud インスタンス を選択します。

    インスタンスのリージョン

    ソースの AnalyticDB for PostgreSQL インスタンスが存在するリージョンを選択します。

    Alibaba Cloud アカウント間でデータを複製

    この例では、同じ Alibaba Cloud アカウント内での同期を示します。× を選択します。

    インスタンス ID

    ソースの AnalyticDB for PostgreSQL インスタンスのインスタンス ID を選択します。

    データベース名

    ソースの AnalyticDB for PostgreSQL インスタンスで同期するデータを含むデータベースの名前を入力します。

    データベースアカウント

    ソースの AnalyticDB for PostgreSQL インスタンスのデータベースアカウントを入力します。アカウントには、同期対象オブジェクトに対する読み取り権限が必要です。

    データベースのパスワード

    データベースアカウントに対応するパスワードを入力します。

    移行先データベース

    既存の接続情報の選択

    • システムに登録されているデータベースインスタンス (新規作成または保存されたもの) を使用するには、ドロップダウンリストから目的のデータベースインスタンスを選択します。以下のデータベース情報は自動的に設定されます。

      説明

      DMS コンソールでは、この設定項目は DMS データベースインスタンスの選択 です。

    • データベースインスタンスをシステムに登録していない場合、または登録済みのインスタンスを使用する必要がない場合は、以下のデータベース情報を手動で設定します。

    データベースタイプ

    Kafka を選択します。

    アクセス方法

    Alibaba Cloud インスタンス を選択します。

    インスタンスのリージョン

    ターゲットの Alibaba Cloud Message Queue for Kafka インスタンスが存在するリージョンを選択します。

    Kafka インスタンス ID

    ターゲットの Alibaba Cloud Message Queue for Kafka インスタンスの ID を選択します。

    暗号化

    ビジネスおよびセキュリティ要件に基づき、非暗号化 または SCRAM-SHA-256 を選択します。

    トピック

    ドロップダウンリストから、データを受信する Topic を選択します。

    Kafka スキーマレジストリの使用

    Kafka スキーマレジストリは、Avro スキーマを保存および取得するための RESTful インターフェイスを提供するメタデータサービスレイヤーです。

    • ×:Kafka スキーマレジストリを使用しません。

    • :Kafka スキーマレジストリを使用します。Avro スキーマが登録されている Kafka スキーマレジストリの URL または IP アドレスを入力する必要があります。

  4. 設定完了後、ページ下部の 接続をテストして続行 をクリックします。

    説明
    • DTS サーバーからのアクセスを許可するために、DTS サービスの IP アドレスブロックが、自動または手動でソースデータベースとターゲットデータベースのセキュリティ設定に追加されていることを確認してください。詳細については、「DTS サーバーの IP アドレスホワイトリストを追加する」をご参照ください。

    • ソースまたはターゲットデータベースが自己管理データベースの場合 (アクセス方法Alibaba Cloud インスタンス ではない場合)、DTS サーバーの CIDR ブロック ダイアログボックスで 接続テスト をクリックする必要もあります。

  5. タスクオブジェクトを設定します。

    1. オブジェクト設定 ページで、同期するオブジェクトを設定できます。

      設定

      説明

      同期タイプ

      デフォルトでは [増分同期] が選択されています。[スキーマ同期][完全データ同期] も選択する必要があります。事前チェックが完了すると、DTS はソースインスタンスの完全データでターゲットクラスターの同期オブジェクトを初期化します。このデータは、後続の増分同期のベースラインとして機能します。

      説明

      ターゲットの Kafka インスタンスの アクセス方法Alibaba Cloud インスタンス の場合、スキーマ同期 はサポートされていません。

      競合するテーブルの処理モード

      • エラーの事前チェックと報告:ターゲットデータベースに同名のテーブルがあるかを確認します。同名のテーブルが見つかった場合、事前チェック中にエラーが報告され、データ同期タスクは開始されません。それ以外の場合、事前チェックは成功します。

        説明

        ターゲットデータベースで同名のテーブルを削除または名前変更できない場合は、オブジェクト名マッピング機能を使用してターゲットデータベースのテーブル名を変更できます。詳細については、「オブジェクト名のマッピング」をご参照ください。

      • エラーを無視して続行:ターゲットデータベースでの名前の衝突チェックをスキップします。

        警告

        エラーを無視して続行 を選択すると、データの不整合が発生し、ビジネスにリスクをもたらす可能性があります。例:

        • テーブルスキーマが一致し、ターゲットデータベースのレコードがソースデータベースのレコードと同じプライマリキーまたは一意キーの値を持つ場合:

          • 完全データ同期中、DTS はターゲットクラスターのレコードを保持します。ソースデータベースの対応するレコードは同期されません。

          • 増分同期中、ソースデータベースのレコードがターゲットデータベースのレコードを上書きします。

        • テーブルスキーマが一致しない場合、データ初期化が失敗する可能性があります。これにより、一部の列のみが同期されたり、同期が完全に失敗したりする可能性があります。注意して進めてください。

      Kafka のデータ形式

      Kafka インスタンスへの同期に必要なデータストレージ形式を選択します。

      • [DTS Avro] を選択した場合、DTS Avro スキーマ定義に従ってデータを解析する必要があります。詳細については、「DTS Avro スキーマ定義」および「DTS Avro デシリアライズの例」をご参照ください。

      • [Canal JSON] を選択した場合、パラメーターの説明と例については「Canal JSON」をご参照ください。

      Kafka 圧縮形式

      Kafka メッセージに必要な圧縮形式を選択します。

      • [LZ4] (デフォルト):圧縮率は低いですが、圧縮速度は速いです。

      • [GZIP]:圧縮率は高いですが、圧縮速度は遅いです。

        説明

        この形式はより多くの CPU リソースを消費します。

      • [Snappy]:圧縮率と圧縮速度は中程度です。

      Kafka パーティションへのデータ転送ポリシー

      必要に応じてポリシーを選択します。

      メッセージ肯定応答メカニズム

      必要に応じてメッセージ確認メカニズムを選択します。

      DDL 情報を格納するトピック

      ドロップダウンリストから、DDL 情報を格納する Topic を選択します。

      説明

      Topic を選択しない場合、DDL 情報はデフォルトでデータを受信する Topic に格納されます。

      移行先インスタンスでのオブジェクト名の大文字化

      ターゲットインスタンスのデータベース、テーブル、列オブジェクトの名前の大文字/小文字ポリシーを設定できます。デフォルトでは [DTS デフォルトポリシー] が選択されています。ソースまたはターゲットデータベースのデフォルトポリシーを使用することも選択できます。詳細については、「ターゲットオブジェクト名の大文字/小文字ポリシー」をご参照ください。

      ソースオブジェクト

      ソースオブジェクト ボックスでオブジェクトをクリックし、向右 をクリックして 選択中のオブジェクト ボックスに移動します。

      説明

      テーブルレベルで同期するオブジェクトを選択できます。

      選択中のオブジェクト

      この例では追加の設定は不要です。マッピング機能を使用して、ターゲットの Kafka インスタンスで Topic 名、Topic のパーティション数、またはソーステーブルの列名を設定できます。

      ターゲット Kafka インスタンスのオブジェクト情報の設定方法 (クリックして展開)

      1. 選択中のオブジェクト セクションで、テーブルレベルのターゲット Topic 名にマウスポインターを合わせます。

      2. ターゲット Topic 名の横に表示される 編集 をクリックします。

      3. 表示される テーブルの編集 ダイアログボックスで、マッピング情報を設定します。

        説明
        • データベースレベルでは、パラメーターが少ない スキーマの編集 ダイアログボックスが表示されます。テーブルレベルでは、テーブルの編集 ダイアログボックスが表示されます。

        • データベース全体以外の粒度でオブジェクトを同期する場合、スキーマの編集 ダイアログボックスで 移行先データベース名 (対象トピックの名前) または パーティション数 を変更することはできません。

        設定

        説明

        対象トピックの名前

        ソーステーブルが同期されるターゲット Topic の名前。デフォルトでは、ソースデータベースとターゲットデータベースの設定 ステップの 移行先データベース セクションで選択した トピック です。

        重要
        • ターゲットデータベースが ApsaraMQ for Kafka インスタンスの場合、入力する Topic 名はターゲットの Kafka インスタンスに存在する必要があります。存在しない場合、データ同期タスクは失敗します。ターゲットデータベースが自己管理の Kafka クラスターで、同期タスクにスキーマ同期が含まれる場合、DTS は入力された Topic をターゲットデータベースに作成しようとします。

        • 対象トピックの名前 を変更すると、データは指定した Topic に書き込まれます。

        フィルタリング条件

        詳細については、「フィルター条件の設定」をご参照ください。

        パーティション数

        データが書き込まれるターゲット Topic のパーティション数。

        ターゲット Topic に書き込まれるデータの列名を編集します。

      説明
      • データベースまたはテーブルレベルで同期する SQL 操作を選択するには、選択中のオブジェクト セクションでオブジェクトを右クリックし、表示されるダイアログボックスで目的の SQL 操作を選択します。

      • オブジェクト名マッピング機能を使用する場合、マッピングされたオブジェクトに依存する他のオブジェクトの同期が失敗する可能性があります。

    2. 詳細設定へ をクリックします。

      設定

      説明

      タスクのスケジュールに使用する専用クラスターの選択

      デフォルトでは、DTS は共有クラスターでタスクをスケジュールするため、選択する必要はありません。タスクの安定性を高めるために、専用クラスターを購入して DTS 同期タスクを実行できます。詳細については、「DTS 専用クラスターとは」をご参照ください。

      [データ転送の暗号化を有効化]

      暗号化された転送を有効にすると、DTS の同期パフォーマンスに影響する可能性があります。本番環境およびセキュリティ要件に基づいて、この機能を有効にするかどうかを選択します。

      • × (デフォルト)

      失敗した接続の再試行時間

      同期タスクの開始後にソースまたはターゲットデータベースへの接続が失敗した場合、DTS はエラーを報告し、直ちに接続のリトライを開始します。デフォルトのリトライ時間は 720 分です。リトライ時間は 10 分から 1,440 分の間でカスタマイズできます。30 分以上の設定を推奨します。指定されたリトライ時間内に DTS が再接続した場合、タスクは自動的に再開されます。それ以外の場合、タスクは失敗します。

      説明
      • 同じソースまたはターゲットを持つ複数の DTS インスタンス (DTS インスタンス A と DTS インスタンス B など) がある場合、A のネットワークリトライ時間が 30 分、B が 60 分に設定されていると、短い方の 30 分が適用されます。

      • DTS は接続リトライ中のタスク実行時間に対して課金するため、ビジネスニーズに基づいてリトライ時間をカスタマイズするか、ソースおよびターゲットデータベースインスタンスがリリースされた後、できるだけ早く DTS インスタンスをリリースすることを推奨します。

      移行元データベースと移行先データベースで他の問題が発生した場合の、再試行までの待機時間です。

      同期タスクの開始後、ソースまたはターゲットデータベースで接続以外の問題 (DDL または DML 実行エラーなど) が発生した場合、DTS はエラーを報告し、直ちに継続的なリトライ操作を開始します。デフォルトのリトライ時間は 10 分です。リトライ時間は 1 分から 1,440 分の間でカスタマイズすることもできます。10 分以上の設定を推奨します。設定されたリトライ時間内に関連操作が成功した場合、同期タスクは自動的に再開されます。それ以外の場合、タスクは失敗します。

      重要

      移行元データベースと移行先データベースで他の問題が発生した場合の、再試行までの待機時間です。 の値は、失敗した接続の再試行時間 の値より小さくする必要があります。

      完全同期レートを制限するかどうか

      完全データ同期フェーズ中、DTS はソースおよびターゲットデータベースの読み取りおよび書き込みリソースを消費し、それらの負荷を増加させる可能性があります。1 秒あたりのソースデータベースのクエリ率 QPS1 秒あたりの完全移行の行数 RPS1 秒あたりの完全移行データ量 (MB) BPS を設定することで、完全データ同期タスクのレート制限を設定し、ターゲットデータベースへの圧力を軽減できます。

      説明

      増分同期率を制限するかどうか

      1 秒あたりの増分同期の行数 RPS1 秒あたりの増分同期データ量 (MB) BPS を設定することで、増分同期タスクのレート制限を設定し、ターゲットデータベースへの圧力を軽減することもできます。

      環境タグ

      必要に応じて環境タグを選択してインスタンスを識別できます。この例では選択は不要です。

      シーンラベル

      必要に応じてシナリオタグを選択してインスタンスのユースケースを識別できます。シナリオタグはタスクの正常な動作に影響しません。この例では選択は不要です。

      ETL 機能の設定

      抽出・変換・書き出し (ETL) 機能を有効にするかどうかを指定します。詳細については、「ETL とは」をご参照ください。有効な値:

      監視アラート

      アラートを設定するかどうかを指定します。同期が失敗した場合や遅延が指定されたしきい値を超えた場合に、アラート連絡先に通知が送信されます。

  6. タスクを保存して事前チェックを実行します。

    • API 操作でこのインスタンスを設定するためのパラメーターを表示するには、次:タスク設定の保存と事前チェック ボタンにマウスポインターを合わせ、ツールチップの OpenAPI パラメーターのプレビュー をクリックします。

    • API パラメーターの表示が完了したら、ページ下部の 次:タスク設定の保存と事前チェック をクリックします。

    説明
    • 同期タスクが開始される前に、事前チェックが実行されます。タスクは事前チェックに合格した後にのみ開始できます。

    • 事前チェックが失敗した場合は、失敗した項目の横にある 詳細を表示 をクリックし、プロンプトに従って問題を解決してから、事前チェックを再実行します。

    • 事前チェック中に警告が生成された場合:

      • チェック項目が失敗し、無視できない場合は、項目の横にある 詳細を表示 をクリックします。その後、指示に従って問題を修正し、事前チェックを再実行します。

      • 無視できるチェック項目については、アラートの詳細を確認無視OK再度事前チェックを実行 の順にクリックして警告をスキップし、事前チェックを再実行します。チェック項目の警告を無視すると、データの不整合などの問題が発生し、ビジネスにリスクをもたらす可能性があります。

  7. インスタンスを購入します。

    1. 成功率 が 100% になったら、次:インスタンスの購入 をクリックします。

    2. 購入 ページで、データ同期インスタンスの課金方法とリンク仕様を選択します。詳細については、次の表をご参照ください。

      カテゴリ

      パラメーター

      説明

      新しいインスタンスクラス

      課金方法

      • サブスクリプション:インスタンス作成時にお支払いいただきます。長期的なニーズに適しており、従量課金よりも手頃で、サブスクリプション期間が長いほど割引率が高くなります。

      • 従量課金:時間単位で課金されます。短期的なニーズに適しています。使用後すぐにインスタンスをリリースしてコストを節約できます。

      リソースグループ設定

      インスタンスが属するリソースグループ。デフォルトは [デフォルトリソースグループ] です。詳細については、「Resource Management とは」をご参照ください。

      リンク仕様

      DTS は、異なるパフォーマンスレベルの同期仕様を提供します。リンク仕様は同期レートに影響します。ビジネスシナリオに基づいて選択できます。詳細については、「データ同期リンク仕様」をご参照ください。

      サブスクリプション期間

      サブスクリプションモードでは、サブスクリプションインスタンスの期間と数量を選択します。月単位のサブスクリプションでは 1〜9 ヶ月、年単位のサブスクリプションでは 1、2、3、5 年から選択できます。

      説明

      このオプションは、課金方法が サブスクリプション の場合にのみ表示されます。

    3. 設定が完了したら、Data Transmission Service (従量課金) 利用規約 を読み、同意のチェックを入れます。

    4. 購入して起動 をクリックし、表示される OK ダイアログボックスで [OK] をクリックします。

      データ同期ページでタスクの進捗状況を確認できます。

よくある質問