すべてのプロダクト
Search
ドキュメントセンター

Data Transmission Service:PolarDB for MySQL クラスターから DataHub プロジェクトへのデータ同期

最終更新日:Mar 29, 2026

Data Transmission Service (DTS) は、PolarDB for MySQL クラスターからの増分データ変更をリアルタイムで DataHub プロジェクトにストリーミングします。データが DataHub に到着すると、Realtime Compute for Apache Flink などの後続の分析サービスへと連携できます。

前提条件

開始する前に、以下の条件を満たしていることを確認してください。

制限事項

DTS は、ソースからターゲットへ外部キーを同期しません。ソースでのカスケード操作および削除操作は、ターゲットへレプリケートされません。

ソースデータベース

制限事項詳細
プライマリキーまたは一意制約が必要プライマリキーまたは一意制約のないテーブルは、送信先で重複レコードが発生する可能性があります。該当するテーブルにこれらの制約がない場合は、Exactly-Once 書き込み機能を有効にしてください。詳細については、「プライマリキーおよび一意制約のないテーブルの同期」をご参照ください。
タスクあたり 1,000 テーブルの上限(リネーム時)同期オブジェクトとしてテーブルを選択し、送信先でテーブル名またはカラム名のリネームが必要な場合、1 つのタスクでサポートされるテーブル数は最大 1,000 件です。この上限を超えるとリクエストエラーが発生します。複数のタスクにワークロードを分割するか、データベースレベルで同期を行ってください。
増分同期にはバイナリロギングおよび loose_polar_log_bin の設定が必要バイナリロギングを有効化し、loose_polar_log_binon に設定する必要があります。設定されていない場合、事前チェックに失敗し、タスクを開始できません。詳細については、「バイナリロギングの有効化」および「パラメーターの変更」をご参照ください。バイナリロギングを有効化すると、ストレージ料金が発生します。
バイナリログの保存期間増分同期のみを行う場合はバイナリログを少なくとも 24 時間、完全データ+増分同期を行う場合は少なくとも 7 日間 保持してください。保存期間が不十分な場合、タスクが失敗する可能性があり、例外的なケースではデータ損失が発生するおそれがあります。完全データ同期が完了した後は、保存期間を 24 時間以上に設定できます。
スキーマまたは完全データ同期中の DDL 禁止スキーマ同期または完全データ同期の実行中は、DDL 文を実行しないでください。実行すると、タスクが失敗します。

その他の制限事項

制限事項詳細
文字列サイズ制限:2 MBターゲットの DataHub プロジェクト内では、1 つの文字列が 2 MB を超えてはなりません。
同期対象のオブジェクトタイプ同期対象として選択できるのは、テーブルおよびデータベースのみです。
読み取り専用ノードは除外DTS は、ソースの PolarDB for MySQL クラスターの読み取り専用ノードを同期しません。
OSS 外部テーブルは除外DTS は、ソースクラスターから Object Storage Service (OSS) 外部テーブルを同期しません。
pt-online-schema-change同期中に pt-online-schema-change などのツールを用いて DDL を実行すると、タスクが失敗します。
同期中のオンライン DDL(単一ソースのみ)同期中に他のソースが送信先に書き込まない場合、ソーステーブルでロックなしDDLを実行するために、Data Management (DMS) を使用できます。詳細については、「ロックなしDDL操作の実行」をご参照ください。
データ損失 のリスク(同時書き込み時)DMS を用いたオンライン DDL 実行中に、他のソースがターゲットへ書き込むと、データ損失 が発生する可能性があります。
タスク復旧の SLAタスクが失敗した場合、DTS サポートによる復旧試行は 8 時間以内に行われます。復旧中にはタスクの再起動およびタスクパラメーターの変更が行われる場合がありますが、データベースパラメーターは変更されません。

特別なケース:DTS は、バイナリログファイルの位置を進めるために、ソースデータベース上で定期的に CREATE DATABASE IF NOT EXISTS `test` を実行します。

課金

同期タイプ料金
スキーマ同期および完全データ同期無料
増分データ同期課金概要有料。詳細については、「」をご参照ください。

サポートされる同期トポロジ

  • 単方向・一対一同期

  • 単方向・一対多同期

  • 単方向・多対一同期

  • 単方向・カスケード同期

全トポロジのリファレンスについては、「同期トポロジ」をご参照ください。

同期可能な SQL 操作

INSERT、UPDATE、DELETE。

必要な権限

ソースの PolarDB for MySQL クラスターのデータベースアカウントには、同期対象オブジェクトに対する最低限の読み取り権限が必要です。

データ同期タスクの作成

以下の手順は、新しい DTS コンソールを基準としています。DTS コンソールと Data Management (DMS) コンソール内の DTS モジュールの表示内容に相違がある場合は、DMS コンソールが優先されます。
  1. データ同期タスクページへ移動します。

    1. Data Management (DMS) コンソール にログインします。

    2. 上部のナビゲーションバーで、Data + AI をクリックします。

    3. 左側のナビゲーションウィンドウで、DTS (DTS) > データ同期 を選択します。

    手順は、DMS コンソールのモードおよびレイアウトによって異なる場合があります。「シンプルモード」および「DMS コンソールのレイアウトとスタイルのカスタマイズ」を参照してください。または、直接「データ同期タスクページ」に移動します。
  2. データ同期インスタンスが配置されているリージョンを選択します。

    新しい DTS コンソールでは、上部のナビゲーションバーからリージョンを選択します。
  3. タスクの作成 をクリックします。ウィザードで、ソースおよびターゲットのデータベースを設定します。

    ソースデータベース

    パラメーター説明
    既存のDMSデータベースインスタンスの選択既存のデータベースインスタンスを選択すると、フィールドが自動的に埋められます。手動で設定する場合は、空白のままにしてください。
    データベースタイプPolarDB for MySQL を選択します。
    アクセス方法Alibaba Cloud インスタンス を選択します。
    インスタンスリージョンソースの PolarDB for MySQL クラスターが配置されているリージョンです。
    Alibaba Cloudアカウント全体でのデータの複製同一アカウント内での同期の場合は、いいえ を選択します。
    PolarDB クラスター IDソースの PolarDB for MySQL クラスターの ID です。
    データベースアカウントソースクラスターのデータベースアカウントです。詳細については、「必要な権限」をご参照ください。
    データベースパスワードデータベースアカウントのパスワードです。

    宛先データベース

    パラメーター説明
    既存のDMSデータベースインスタンスの選択既存のデータベースインスタンスを選択すると、フィールドが自動的に埋められます。手動で設定する場合は、空白のままにしてください。
    データベースタイプDataHub を選択します。
    アクセス方法Alibaba Cloud インスタンス を選択します。
    インスタンスリージョンターゲットの DataHub プロジェクトが配置されているリージョンです。
    プロジェクト同期データを受信する DataHub プロジェクトです。
  4. [接続をテストして続行] をクリックします。 DTS は、サーバーの CIDR ブロックを Alibaba Cloud データベースインスタンス (ApsaraDB RDS for MySQL、ApsaraDB for MongoDB など) のホワイトリストと、Elastic Compute Service (ECS) でホストされているデータベースのセキュリティグループルールに自動的に追加します。 複数の ECS インスタンスにデプロイされたデータベースの場合、DTS の CIDR ブロックを各インスタンスのセキュリティグループルールに手動で追加してください。 データセンターまたはサードパーティのクラウド上の自己管理データベースの場合、CIDR ブロックをデータベースのホワイトリストに手動で追加してください。 詳細については、「DTS サーバーの CIDR ブロックを追加する」をご参照ください。

    警告

    DTS の CIDR ブロックをホワイトリストまたはセキュリティグループに追加すると、セキュリティ上の露出が生じます。続行する前に、強力な認証情報の使用、公開ポートの制限、API 呼び出しの認証、ホワイトリストまたは ECS セキュリティグループルールの定期的な確認および不正な CIDR ブロックの禁止、可能であれば Express Connect、VPN Gateway、または Smart Access Gateway を介した接続など、保護措置を講じてください。

  5. 同期対象オブジェクトおよび同期設定を構成します。

    パラメーター説明
    同期タイプ増分データ同期 がデフォルトで選択されています。また、スキーマ同期 のみを選択することもできます。完全データ同期 は、この送信先タイプでは利用できません。スキーマ同期中、DTS は選択されたテーブルのスキーマをソースからターゲットの DataHub プロジェクトへコピーします。
    競合テーブルの処理モード事前チェックとエラー報告(デフォルト):ターゲットに同名のテーブルが既に存在する場合、事前チェックが失敗します。ターゲットのテーブルを削除または名前変更せずに名前競合を解決するには、オブジェクト名マッピング機能を使用してください。詳細については、「オブジェクト名のマッピング」をご参照ください。エラーを無視して続行:名前競合のチェックをスキップします。
    警告

    このオプションはデータの不整合を引き起こす可能性があります。完全データ同期中、主キーまたは一意キーの値が一致する既存のターゲットレコードは保持され(上書きされません)。増分同期中は、それらが上書きされます。スキーマの不一致により、部分的な同期または タスクの失敗 が発生する可能性があります。

    追加カラムの命名規則はい または いいえ を選択して、DTS がターゲットトピックに追加するカラムに対して新しい命名規則を使用するかどうかを制御します。このオプションを設定する前に、命名競合がないか確認してください。競合が発生すると、タスクの失敗 または データ損失追加列の命名ルール が発生する可能性があります。詳細については、「」をご参照ください。
    宛先インスタンスでのオブジェクト名の大文字化ターゲットにおけるデータベース名、テーブル名、カラム名の大文字小文字の扱いを制御します。デフォルト: DTS のデフォルトポリシー送信先インスタンスにおけるオブジェクト名の英大文字・小文字の指定。詳細については、「」をご参照ください。
    ソースオブジェクト同期するテーブルまたはデータベースを選択し、矢印アイコンをクリックして 選択済みオブジェクト に移動します。
    選択済みオブジェクトオブジェクトを右クリックして、その名前を変更します(単一のオブジェクト)。複数のオブジェクトの名前を一度に変更するには、[バッチ編集] をクリックします。「オブジェクト名のマッピング」をご参照ください。
  6. 次へ:高度な設定 をクリックします。

    パラメーター説明
    モニタリングとアラートいいえ:アラート機能を無効にします。はい:アラート機能を有効にします。アラートのしきい値と通知先の連絡先を設定してください。詳細については、「モニタリングとアラートの設定」をご参照ください。
    接続失敗時のリトライ時間タスク開始後に DTS が接続失敗をリトライする時間を指定します。範囲:10~1,440 分。デフォルト:720 分。この値は少なくとも 30 分以上に設定してください。DTS がリトライ期間内に再接続できた場合、タスクは再開されます。それ以外の場合はタスクが失敗します。複数のタスクで同じソースまたは送信先を共有している場合、最も短いリトライ時間がすべてのタスクに適用されます。DTS はリトライ中もインスタンスに対して課金されますので、ソースまたは送信先が廃止された場合は速やかにインスタンスをリリースしてください。
    ETL の設定いいえ(デフォルト)またははい。有効にした場合、コードエディタに抽出・変換・書き出し(ETL)用のドメイン特化言語(DSL)文を入力します。詳細については、「ETL の設定」をご参照ください。
  7. (任意)選択済みオブジェクト セクションで、トピック名を右クリックしてテーブルまたはデータベースの名前を変更するか、パーティション用のシャードキーを設定します。

  8. 次へ:タスク設定の保存と事前チェック をクリックします。このタスクの OpenAPI パラメーターをプレビューするには、ボタンにカーソルを合わせて、OpenAPI パラメーターのプレビュー をクリックしてください。

    DTS はタスク開始前に事前チェックを実行します。タスクは、事前チェックに合格した場合にのみ開始されます。事前チェックに失敗した場合は、各失敗項目の横にある 詳細の表示 をクリックし、根本原因を修正したうえで、再チェック をクリックしてください。無視可能な警告項目については、警告の詳細の確認 > 無視 > OK > 再チェック の順にクリックしてください。警告を無視すると、データの不整合が発生する可能性があります。
  9. 成功率100 % になるまで待機し、次へ:インスタンスの購入 をクリックします。

  10. 購入ページで、インスタンスを構成します。

    パラメーター説明
    課金方法サブスクリプション:固定期間の前払い — 長期利用にコスト効率的です。従量課金:時間単位での課金 — 短期利用に適しています。不要になった場合は、インスタンスをリリースして課金を停止してください。
    リソースグループ設定インスタンスのリソースグループです。デフォルト:デフォルトリソースグループResource Management とは
    インスタンスクラスデータ同期インスタンスのインスタンスクラス同期スループットのクラスです。詳細については、「」をご参照ください。
    サブスクリプション期間サブスクリプション課金の場合のみ利用可能です。選択肢:1~9 か月、1 年、2 年、3 年、5 年。
  11. Data Transmission Service(従量課金)サービス利用規約 を読み、チェックボックスを選択します。

  12. 購入して開始 をクリックし、確認ダイアログで OK をクリックします。

タスクはタスク一覧に表示されます。そこで進行状況を追跡できます。

DataHub トピックのスキーマ

DTS が増分データを DataHub トピックに書き込む際、元のデータフィールドに加えて、変更メタデータを格納するためのシステムカラムが追加されます。

以下の図は、トピックスキーマの例を示しています。この例では、idnameaddress が元のデータフィールドです。以前の命名規則では、DTS は元のフィールドを含むすべてのフィールドに dts_ プレフィックスを付与します。新しい命名規則では、元のデータフィールドはプレフィックスなしでそのままの名前が維持されます。

Topic定义

下記の表では、各追加カラムについて説明します。

従来のカラム名新しいカラム名説明
dts_record_idnew_dts_sync_dts_record_idString増分ログエントリの固有 ID です。通常は自動インクリメントされますが、ディザスタリカバリ時のロールバック後はインクリメントされない場合があり、一部の ID が重複する可能性があります。UPDATE 操作では、更新前および更新後の両方のログエントリが同じ dts_record_id を共有します。
dts_operation_flagnew_dts_sync_dts_operation_flagString操作タイプです。値: I = INSERT、D = DELETE、U = UPDATE、F = 完全データ同期。
dts_instance_idnew_dts_sync_dts_instance_idStringデータベースのサーバー ID です。
dts_db_namenew_dts_sync_dts_db_nameStringデータベース名です。
dts_table_namenew_dts_sync_dts_table_nameStringテーブル名です。
dts_utc_timestampnew_dts_sync_dts_utc_timestampString操作の協定世界時 (UTC) タイムスタンプ(ログファイルのタイムスタンプでもあります)。
dts_before_flagnew_dts_sync_dts_before_flagString行の値が更新前の値であるかどうかを示します。Y = はい、N = いいえ。INSERT: N。UPDATE(更新前エントリ): Y。UPDATE(更新後エントリ): N。DELETE: Y
dts_after_flagnew_dts_sync_dts_after_flagString行の値が更新後の値であるかどうかを示します。Y = はい、N = いいえ。INSERT: Y。UPDATE(更新前エントリ): N。UPDATE(更新後エントリ): Y。DELETE: N

操作タイプ別のフラグ値

dts_before_flag および dts_after_flag カラムは、特定のログエントリがどのバージョンの行を表しているかをエンコードします。

INSERT

INSERT エントリは、新しく挿入された値(更新後の値)を記録します。

dts_before_flagdts_after_flag
NY
INSERT操作

UPDATE

DTS は、1 件の UPDATE ごとに 2 つのログエントリ(更新前状態と更新後状態)を生成します。両エントリは、同じ dts_record_iddts_operation_flag、および dts_utc_timestamp の値を共有します。

エントリdts_before_flagdts_after_flag
更新前(エントリ 1)YN
更新後(エントリ 2)NY
UPDATE操作

DELETE

DELETE エントリは、削除された値(更新前の値)を記録します。

dts_before_flagdts_after_flag
YN
DELETE操作

次のステップ

同期タスクが実行中になった後は、Realtime Compute for Apache Flink を使用して、DataHub プロジェクトに流入するデータを分析します。詳細については、「Alibaba Cloud Realtime Compute for Apache Flink とは?」をご参照ください。