すべてのプロダクト
Search
ドキュメントセンター

Data Transmission Service:PolarDB\-X 2\.0 から MaxCompute へのデータ同期

最終更新日:Mar 29, 2026

Data Transmission Service (DTS) を使用して、PolarDB-X 2.0 インスタンスから MaxCompute プロジェクトへデータを継続的にレプリケーションします。DTS はまず初期完全同期を実行し、その後ソースのバイナリログから増分変更をキャプチャして、MaxCompute 内の専用増分データテーブルに書き込みます。この増分データテーブルは、全量ベースラインテーブルとともに配置されます。

前提条件

開始する前に、以下の条件を満たしていることを確認してください。

  • MySQL 5.7 と互換の PolarDB-X 2.0 インスタンスです。「インスタンスの作成」および「データベースの作成」を参照してください。

  • MaxCompute の有効化。詳細については、「MaxCompute および DataWorks の有効化」をご参照ください。

  • MaxCompute プロジェクトの作成。詳細については、「MaxCompute プロジェクトの作成」をご参照ください。

  • ソース PolarDB-X インスタンス上のデータベースアカウント(以下の権限が必要):

    権限使用タイミング必要性の理由
    SELECT初期完全データ同期時同期対象オブジェクトの行を読み取ります。
    REPLICATION CLIENT常時バイナリログのステータス(SHOW MASTER STATUSSHOW BINARY LOGS)を照会します。
    REPLICATION SLAVE増分データ同期ソースインスタンスに接続し、バイナリログを読み取ります。

    これらの権限を付与する手順については、「PolarDB-X のデータ同期ツール」をご参照ください。

  • PolarDB-X 2.0 コンソールでバイナリログ記録が有効化され、binlog_row_imagefull に設定されています。「パラメーター設定」を参照してください。binlog_row_imagefull に設定されていない場合、事前チェックが失敗し、タスクを開始できません。

  • バイナリログの保存期間を以下のように設定します。

    同期範囲最低保存期間
    増分同期のみ24 時間
    全量+増分同期7 日間(全量同期完了後は 24 時間に短縮可能)

    DTS が保存期間が短すぎるためバイナリログを読み取れない場合、タスクは失敗し、データの不整合または損失が発生する可能性があります。上記要件に従って保存期間を設定してください。設定が適切でない場合、Data Transmission Service (DTS) のサービスレベル合意 (SLA) で定められたサービスの信頼性およびパフォーマンスを保証できません。

制限事項

外部キーは同期されません。 ソースデータベースにおける外部キー制約によってトリガーされる CASCADE および DELETE 操作は、MaxCompute にはレプリケーションされません。

ソースデータベースの制限

制約詳細回避策
プライマリキーまたは一意キーの必須PRIMARY KEY または UNIQUE 制約がないテーブルでは、宛先に重複レコードが生成される可能性があります。タスク開始前に、ソーステーブルにプライマリキーまたは一意キーを追加します。
オブジェクト名変更時のテーブル数制限宛先でテーブルまたは列の名前を変更する場合、1 つのタスクで同期可能なテーブル数は最大 5,000 個です。5,000 個を超えると、リクエストエラーでタスクが失敗します。複数のタスクに分割するか、テーブル単位での名前変更を行わず、データベース全体を同期します。
TABLEGROUP および Locality 属性の非対応TABLEGROUP または Locality 属性を使用する PolarDB-X インスタンスはサポートされていません。同期前に TABLEGROUP 定義および Locality 属性を削除します。
大文字を含むテーブル名テーブル名に大文字を含む場合、スキーマ同期のみがサポートされます。テーブル名を小文字で統一するか、スキーマ同期のみを実行します。
同期中の DDL 実行不可スキーマ同期または全量データ同期中に DDL ステートメントを実行すると、タスクが失敗します。初期同期フェーズが完了するまで、DDL 操作を一時停止します。

その他の制限

  • 同期対象として選択できるのは テーブルのみです。ビュー、トリガー、ストアドプロシージャは同期されません。

  • 初期全量データ同期中、DTS はソースおよび宛先の両方で読み取りおよび書き込みリソースを使用するため、サーバー負荷が増加します。ピーク時間帯を避けて同期を実行してください。

  • 全量データ同期では同時 INSERT 操作が使用されるため、宛先テーブルがフラグメント化する可能性があります。全量同期完了後の宛先テーブルスペースは、ソースよりも大きくなることがあります。

  • 同期中の DDL 操作に pt-online-schema-change を使用しないでください。これによりタスクが失敗する可能性があります。

  • 同期中は、DTS を通じてのみ宛先にデータを書き込んでください。他のツールによる書き込みはデータの不整合を引き起こす可能性があり、宛先で Data Management Service (DMS) を使用したオンライン DDL 操作はデータ損失を招く可能性があります。

  • MaxCompute はプライマリキー制約をサポートしていません。 ネットワークエラーが発生した場合、DTS が宛先に重複レコードを書き込む可能性があります。クエリ設計時に重複を処理できるようにしてください。

  • DTS タスクが失敗した場合、DTS テクニカルサポートが 8 時間以内に復旧を試みます。タスクは再起動され、タスクパラメーター(データベースパラメーターではない)が調整される場合があります。

DTS は定期的にソースデータベース内の dts_health_check.ha_health_check テーブルを更新し、バイナリログファイルの位置を進めます。

課金

同期タイプ料金
スキーマ同期および全量データ同期無料
増分データ同期課金済み。「課金概要

サポートされる SQL 操作

操作タイプSQL ステートメント
DMLINSERT、UPDATE、DELETE

同期タスクの作成

ステップ 1:データ同期タスクページへ移動

  1. Data Management (DMS) コンソール にログインします。

  2. 上部ナビゲーションバーで、Data + AI をクリックします。

  3. 左側ナビゲーションウィンドウで、DTS (DTS)データ同期 を選択します。

コンソールのレイアウトは異なる場合があります。詳細については、「シンプルモード」および「DMS コンソールのレイアウトとスタイルのカスタマイズ」をご参照ください。また、新しい DTS コンソールの「データ同期タスクページ」に直接アクセスすることもできます。

ステップ 2:ソースおよび宛先の設定

  1. データ同期タスク ページで、データ同期インスタンスが存在するリージョンを選択します。

    新しい DTS コンソールでは、代わりに上部ナビゲーションバーからリージョンを選択します。
  2. タスクの作成 をクリックします。ウィザードで以下のパラメーターを設定します。

    タスク名

    パラメーター説明
    タスク名タスクを識別するための説明的な名前を入力します。名前は一意である必要はありません。

    ソースデータベース

    パラメーター説明
    DMS データベースインスタンスの選択既存のインスタンスを選択すると、パラメーターが自動的に入力されます。空白のままにすると、手動で詳細を入力できます。
    データベースタイプPolarDB-X 2.0 を選択します。
    アクセス方法Alibaba Cloud インスタンス を選択します。
    インスタンスリージョンPolarDB-X インスタンスが存在するリージョンです。
    データベースアカウントSELECT、REPLICATION CLIENT、REPLICATION SLAVE 権限を持つデータベースアカウント(前提条件 をご参照ください)。
    データベースパスワードデータベースアカウントのパスワードです。

    宛先データベース

    パラメーター説明
    [DMS データベースインスタンスの選択]既存のインスタンスを選択してパラメーターを自動入力するか、空のままにして手動で詳細を入力します。
    [データベースタイプ][MaxCompute] を選択します。
    [アクセス方法][Alibaba Cloud インスタンス] を選択します。
    [インスタンスリージョン]送信先の MaxCompute プロジェクトが存在するリージョンです。
    [プロジェクト]MaxCompute プロジェクトの名前です。DataWorks コンソールの Workspaces ページで確認できます。
    [accessKeyId]MaxCompute への接続に使用するアカウントの AccessKey ID です。詳細は、「AccessKey ペアの取得」をご参照ください。
    [accessSecret]MaxCompute への接続に使用するアカウントの AccessKey Secret です。詳細は、「AccessKey ペアの取得」をご参照ください。
  3. [接続性のテストと続行]」をクリックします。DTS は、自動的にそのサーバーの CIDR ブロックを Alibaba Cloud データベースインスタンスのホワイトリストおよび Elastic Compute Service (ECS) でホストされるデータベースのセキュリティグループルールに追加します。データセンターまたはサードパーティクラウド上の自己管理データベースの場合、DTS サーバーの CIDR ブロックを手動で追加します。詳細については、「DTS サーバーの CIDR ブロックをオンプレミスデータベースのセキュリティ設定に追加する」をご参照ください。

    警告

    DTS サーバーの CIDR ブロックをホワイトリストまたはセキュリティグループに追加すると、セキュリティリスクが生じる可能性があります。続行する前に、認証情報の強化、公開ポートの制限、API 呼び出しの認証、ホワイトリストルールの定期監査などの予防措置を講じてください。あるいは、Express Connect、VPN Gateway、Smart Access Gateway を使用して接続することで、ホワイトリストの変更を回避できます。

  4. OK をクリックして、MaxCompute アカウントに必要な権限を付与します。

ステップ 3:オブジェクトの選択と同期オプションの設定

以下のパラメーターを設定します。

パラメーター説明
同期タイプ[スキーマ同期][完全データ同期]、および [増分データ同期] を選択します。デフォルトでは、[増分データ同期] が選択されています。この場合、[スキーマ同期][完全データ同期] も選択する必要があります。DTS は、まず既存データ (増分同期のベースライン) を同期し、その後、変更を継続的に同期します。
増分データテーブルのパーティション定義増分データテーブルのパーティション名を選択します。デフォルトのパーティションキーフィールドは、modifytime_yearmodifytime_monthmodifytime_daymodifytime_hour、および modifytime_minute です。詳細については、「パーティション」をご参照ください。
競合するテーブルの処理モード[事前チェックとエラー報告] (デフォルト):宛先テーブルがソーステーブルと同じ名前を持つ場合、事前チェックは失敗します。宛先テーブルを上書きしてはならない場合に使用します。名前が競合し、宛先テーブルを削除または名前変更できない場合は、オブジェクト名マッピングを使用して同期対象のテーブルの名前を変更します。詳細については、「オブジェクト名のマッピング」をご参照ください。[エラーを無視して続行]:名前競合チェックをスキップします。スキーマが一致し、レコードのプライマリキーまたは一意キーがすでに送信先に存在する場合、DTS は完全同期中には既存のレコードを保持し、増分同期中にはそれを上書きします。スキーマが異なる場合、同期が失敗したり、一部の列しか書き込まれなかったりする可能性があるため、注意して進めてください。
追加列の命名ルールDTS は各宛先テーブルにメタデータ列を追加します。これらが既存の列名と競合する場合、同期は失敗します。ご利用のセットアップに基づいて [新しいルール] または [以前のルール] を選択します。このパラメーターを設定する前に、名前の競合を確認してください。詳細については、「追加列の命名ルール」をご参照ください。
宛先インスタンスのオブジェクト名の大文字/小文字の区別MaxCompute のデータベース名、テーブル名、列名の大文字/小文字の区別をコントロールします。デフォルト: [DTS デフォルトポリシー]。詳細については、「宛先インスタンスのオブジェクト名の大文字/小文字の区別を指定」をご参照ください。
ソースオブジェクトオブジェクトを選択し、右矢印アイコンをクリックして [選択したオブジェクト] に移動します。オブジェクトとしてテーブルを選択してください。DTS はビュー、トリガー、ストアドプロシージャを同期しません。
選択したオブジェクトオブジェクトを右クリックして名前を変更したり、SQL 操作をフィルターしたりします。一度に複数のオブジェクトの名前を変更するには、[一括編集] をクリックします。条件によってデータをフィルターするには、右クリックして WHERE 条件を指定します。詳細については、「オブジェクト名のマッピング」および「フィルター条件の指定」をご参照ください。

ステップ 4:高度な設定の構成

次へ:高度な設定 をクリックし、以下の設定を行います。

パラメーター説明
モニタリングとアラートタスクが失敗した場合や同期遅延がしきい値を超えた場合にアラートを受け取るには、はいモニタリングとアラートの設定 を選択します。アラートのしきい値および通知設定を構成します。「」をご参照ください。いいえ を選択すると、アラート機能をスキップします。
失敗した接続のリトライ時間タスク開始後に DTS が失敗した接続をリトライする期間です。有効な値:10~1,440 分。デフォルト:720 分。少なくとも 30 分に設定してください。複数のタスクが同じソースまたは宛先を共有する場合、最も短いリトライ期間がすべてのタスクに適用されます。リトライ中は、インスタンスに対して課金されます。
ETL の構成ETL ルールの設定とデータ処理文の入力を行うには、[はい] を選択します。ETL をスキップするには、[いいえ] を選択します。「ETL とは何か?

ステップ 5:事前チェックの実行

  1. 次へ:タスク設定の保存と事前チェック をクリックします。保存前にこのタスクの OpenAPI パラメーターをプレビューするには、ボタンにカーソルを合わせて OpenAPI パラメーターのプレビュー をクリックします。

  2. 事前チェックが完了するまで待ちます。

    • 項目が失敗した場合、詳細の表示 をクリックして問題を修正し、その後 再事前チェック をクリックします。

    • 無視できないアラートが発生した場合、問題を修正して事前チェックを再実行します。

    • 無視可能なアラート項目の場合、アラートの詳細の確認無視OK をクリックし、その後 再事前チェック をクリックします。アラートを無視すると、データの不整合が発生する可能性があります。

  3. 成功確率100% になるまで待ち、その後 次へ:インスタンスの購入 をクリックします。

ステップ 6:インスタンスの購入

購入ページで以下の設定を行います。

パラメーター説明
課金方法サブスクリプション:固定期間分を前払いする方式で、長期利用の場合にコスト効率が高くなります。従量課金:1 時間単位で課金される方式で、短期利用や試用に適しています。不要になったインスタンスは速やかにリリースし、継続的な課金を回避してください。
リソースグループこのインスタンスに割り当てるリソースグループです。デフォルト:default resource group。詳細については、「What is Resource Management?
インスタンスクラス同期スループットの階層です。詳細については、「Instance classes of data synchronization instances」をご参照ください。
サブスクリプション期間(サブスクリプション課金のみ)契約期間:1~9 か月、または 1 年、2 年、3 年、5 年から選択できます。

Data Transmission Service(従量課金)サービス利用規約 をお読みになり、チェックボックスをオンにして、購入して開始OK をクリックします。

タスクはタスクリストに表示されます。進捗状況はそこから監視できます。

増分データテーブルのスキーマ

MaxCompute プロジェクトで全表スキャンを許可するには、set odps.sql.allow.fullscan=true; を実行します。

DTS は PolarDB-X 2.0 からの増分変更を MaxCompute の増分データテーブルに書き込みます。ソーステーブルの列に加えて、DTS はすべての増分データテーブルに以下のメタデータ列を追加します。

Schema of an incremental data table
この例では、modifytime_yearmodifytime_monthmodifytime_daymodifytime_hourmodifytime_minute が、ステップ 3 で設定された通りパーティションキーを構成します。
説明
record_idログエントリの固有 ID。新しいエントリごとに自動インクリメントされます。UPDATE 操作では、DTS が同じ record_id を持つ 2 つのエントリを生成します。1 つは更新前の値、もう 1 つは更新後の値です。
operation_flag操作タイプ:I(INSERT)、U(UPDATE)、D(DELETE)。
utc_timestampバイナリログから取得した操作の協定世界時(UTC)タイムスタンプです。
before_flagY、それ以外の場合は N
after_flagY、それ以外の場合は N

before_flag および after_flag の動作

before_flag および after_flag の値は、操作タイプによって異なります。

  • INSERT:行には新しく挿入された値が格納されます。before_flag = Nafter_flag = Y

    Example of an INSERT operation

  • UPDATE:DTS は同じ record_idoperation_flagutc_timestamp を持つ 2 つのログエントリを生成します。最初のエントリには更新前の値が格納され(before_flag = Yafter_flag = N)、2 番目のエントリには更新後の値が格納されます(before_flag = Nafter_flag = Y)。

    Example of an UPDATE operation

  • DELETE:行には削除されたレコードの値が格納されます。before_flag = Yafter_flag = N

    Example of a DELETE operation

全量ベースラインテーブルと増分データテーブルのマージ

同期タスクが開始されると、DTS は MaxCompute 内に全量ベースラインテーブルおよび増分データテーブルを作成します。SQL を使用して 2 つのテーブルをマージし、任意の時点における完全なデータセットを再構築します。

以下の例では、customer という名前のテーブルのデータをマージします。

Schema of the customer table
  1. ソーステーブルと同じスキーマを持つ宛先テーブルを作成し、マージ結果を格納します。以下の例では、タイムスタンプ 1565944878 時点の customer データを格納するテーブルを作成します。

    CREATE TABLE `customer_1565944878` (
        `id` bigint NULL,
        `register_time` datetime NULL,
        `address` string);
    アドホッククエリ機能を使用して、MaxCompute で SQL ステートメントを実行します。詳細については、「SQL ステートメントを実行するためのアドホッククエリ機能の使用」をご参照ください。サポートされているデータの型については、「データの型のエディション」をご参照ください。
  2. 以下の SQL を実行して、全量ベースラインテーブルと増分データテーブルをマージします。プレースホルダーを実際の値に置き換えてください。

    set odps.sql.allow.fullscan=true;
    insert overwrite table <result_storage_table>
    select <col1>,
           <col2>,
           <colN>
      from(
    select row_number() over(partition by t.<primary_key_column>
     order by record_id desc, after_flag desc) as row_number, record_id, operation_flag, after_flag, <col1>, <col2>, <colN>
      from(
    select incr.record_id, incr.operation_flag, incr.after_flag, incr.<col1>, incr.<col2>, incr.<colN>
      from <table_log> incr
     where utc_timestamp< <timestamp>
     union all
    select 0 as record_id, 'I' as operation_flag, 'Y' as after_flag, base.<col1>, base.<col2>, base.<colN>
      from <table_base> base) t) gt
    where row_number=1
      and after_flag='Y'

    以下のプレースホルダーを置き換えます。

    プレースホルダー説明
    <result_storage_table>マージされたデータを格納するテーブル
    <col1><col2><colN>マージ対象テーブルの列名
    <primary_key_column>マージ対象テーブルのプライマリキー列
    <table_log>増分データテーブルの名前
    <table_base>全量ベースラインテーブルの名前
    <timestamp>マージを行う時点のタイムスタンプ

    以下の例では、タイムスタンプ 1565944878 時点の完全な customer データセットを取得します。

    set odps.sql.allow.fullscan=true;
    insert overwrite table customer_1565944878
    select id,
           register_time,
           address
      from(
    select row_number() over(partition by t.id
     order by record_id desc, after_flag desc) as row_number, record_id, operation_flag, after_flag, id, register_time, address
      from(
    select incr.record_id, incr.operation_flag, incr.after_flag, incr.id, incr.register_time, incr.address
      from customer_log incr
     where utc_timestamp< 1565944878
     union all
    select 0 as record_id, 'I' as operation_flag, 'Y' as after_flag, base.id, base.register_time, base.address
      from customer_base base) t) gt
     where gt.row_number= 1
       and gt.after_flag= 'Y';
  3. customer_1565944878 からマージされたデータをクエリします。

    Query the merged data