Data Transmission Service (DTS) を使用して、PolarDB-X 2.0 インスタンスから MaxCompute プロジェクトへデータを継続的にレプリケーションします。DTS はまず初期完全同期を実行し、その後ソースのバイナリログから増分変更をキャプチャして、MaxCompute 内の専用増分データテーブルに書き込みます。この増分データテーブルは、全量ベースラインテーブルとともに配置されます。
前提条件
開始する前に、以下の条件を満たしていることを確認してください。
MySQL 5.7 と互換の PolarDB-X 2.0 インスタンスです。「インスタンスの作成」および「データベースの作成」を参照してください。
MaxCompute の有効化。詳細については、「MaxCompute および DataWorks の有効化」をご参照ください。
MaxCompute プロジェクトの作成。詳細については、「MaxCompute プロジェクトの作成」をご参照ください。
ソース PolarDB-X インスタンス上のデータベースアカウント(以下の権限が必要):
権限 使用タイミング 必要性の理由 SELECT初期完全データ同期時 同期対象オブジェクトの行を読み取ります。 REPLICATION CLIENT常時 バイナリログのステータス( SHOW MASTER STATUS、SHOW BINARY LOGS)を照会します。REPLICATION SLAVE増分データ同期 ソースインスタンスに接続し、バイナリログを読み取ります。 これらの権限を付与する手順については、「PolarDB-X のデータ同期ツール」をご参照ください。
PolarDB-X 2.0 コンソールでバイナリログ記録が有効化され、
binlog_row_imageがfullに設定されています。「パラメーター設定」を参照してください。binlog_row_imageがfullに設定されていない場合、事前チェックが失敗し、タスクを開始できません。バイナリログの保存期間を以下のように設定します。
同期範囲 最低保存期間 増分同期のみ 24 時間 全量+増分同期 7 日間(全量同期完了後は 24 時間に短縮可能) DTS が保存期間が短すぎるためバイナリログを読み取れない場合、タスクは失敗し、データの不整合または損失が発生する可能性があります。上記要件に従って保存期間を設定してください。設定が適切でない場合、Data Transmission Service (DTS) のサービスレベル合意 (SLA) で定められたサービスの信頼性およびパフォーマンスを保証できません。
制限事項
外部キーは同期されません。 ソースデータベースにおける外部キー制約によってトリガーされる CASCADE および DELETE 操作は、MaxCompute にはレプリケーションされません。
ソースデータベースの制限
| 制約 | 詳細 | 回避策 |
|---|---|---|
| プライマリキーまたは一意キーの必須 | PRIMARY KEY または UNIQUE 制約がないテーブルでは、宛先に重複レコードが生成される可能性があります。 | タスク開始前に、ソーステーブルにプライマリキーまたは一意キーを追加します。 |
| オブジェクト名変更時のテーブル数制限 | 宛先でテーブルまたは列の名前を変更する場合、1 つのタスクで同期可能なテーブル数は最大 5,000 個です。5,000 個を超えると、リクエストエラーでタスクが失敗します。 | 複数のタスクに分割するか、テーブル単位での名前変更を行わず、データベース全体を同期します。 |
| TABLEGROUP および Locality 属性の非対応 | TABLEGROUP または Locality 属性を使用する PolarDB-X インスタンスはサポートされていません。 | 同期前に TABLEGROUP 定義および Locality 属性を削除します。 |
| 大文字を含むテーブル名 | テーブル名に大文字を含む場合、スキーマ同期のみがサポートされます。 | テーブル名を小文字で統一するか、スキーマ同期のみを実行します。 |
| 同期中の DDL 実行不可 | スキーマ同期または全量データ同期中に DDL ステートメントを実行すると、タスクが失敗します。 | 初期同期フェーズが完了するまで、DDL 操作を一時停止します。 |
その他の制限
同期対象として選択できるのは テーブルのみです。ビュー、トリガー、ストアドプロシージャは同期されません。
初期全量データ同期中、DTS はソースおよび宛先の両方で読み取りおよび書き込みリソースを使用するため、サーバー負荷が増加します。ピーク時間帯を避けて同期を実行してください。
全量データ同期では同時 INSERT 操作が使用されるため、宛先テーブルがフラグメント化する可能性があります。全量同期完了後の宛先テーブルスペースは、ソースよりも大きくなることがあります。
同期中の DDL 操作に pt-online-schema-change を使用しないでください。これによりタスクが失敗する可能性があります。
同期中は、DTS を通じてのみ宛先にデータを書き込んでください。他のツールによる書き込みはデータの不整合を引き起こす可能性があり、宛先で Data Management Service (DMS) を使用したオンライン DDL 操作はデータ損失を招く可能性があります。
MaxCompute はプライマリキー制約をサポートしていません。 ネットワークエラーが発生した場合、DTS が宛先に重複レコードを書き込む可能性があります。クエリ設計時に重複を処理できるようにしてください。
DTS タスクが失敗した場合、DTS テクニカルサポートが 8 時間以内に復旧を試みます。タスクは再起動され、タスクパラメーター(データベースパラメーターではない)が調整される場合があります。
DTS は定期的にソースデータベース内のdts_health_check.ha_health_checkテーブルを更新し、バイナリログファイルの位置を進めます。
課金
| 同期タイプ | 料金 |
|---|---|
| スキーマ同期および全量データ同期 | 無料 |
| 増分データ同期 | 課金済み。「課金概要 |
サポートされる SQL 操作
| 操作タイプ | SQL ステートメント |
|---|---|
| DML | INSERT、UPDATE、DELETE |
同期タスクの作成
ステップ 1:データ同期タスクページへ移動
Data Management (DMS) コンソール にログインします。
上部ナビゲーションバーで、Data + AI をクリックします。
左側ナビゲーションウィンドウで、DTS (DTS) > データ同期 を選択します。
コンソールのレイアウトは異なる場合があります。詳細については、「シンプルモード」および「DMS コンソールのレイアウトとスタイルのカスタマイズ」をご参照ください。また、新しい DTS コンソールの「データ同期タスクページ」に直接アクセスすることもできます。
ステップ 2:ソースおよび宛先の設定
データ同期タスク ページで、データ同期インスタンスが存在するリージョンを選択します。
新しい DTS コンソールでは、代わりに上部ナビゲーションバーからリージョンを選択します。
タスクの作成 をクリックします。ウィザードで以下のパラメーターを設定します。
タスク名
パラメーター 説明 タスク名 タスクを識別するための説明的な名前を入力します。名前は一意である必要はありません。 ソースデータベース
パラメーター 説明 DMS データベースインスタンスの選択 既存のインスタンスを選択すると、パラメーターが自動的に入力されます。空白のままにすると、手動で詳細を入力できます。 データベースタイプ PolarDB-X 2.0 を選択します。 アクセス方法 Alibaba Cloud インスタンス を選択します。 インスタンスリージョン PolarDB-X インスタンスが存在するリージョンです。 データベースアカウント SELECT、REPLICATION CLIENT、REPLICATION SLAVE 権限を持つデータベースアカウント(前提条件 をご参照ください)。 データベースパスワード データベースアカウントのパスワードです。 宛先データベース
パラメーター 説明 [DMS データベースインスタンスの選択] 既存のインスタンスを選択してパラメーターを自動入力するか、空のままにして手動で詳細を入力します。 [データベースタイプ] [MaxCompute] を選択します。 [アクセス方法] [Alibaba Cloud インスタンス] を選択します。 [インスタンスリージョン] 送信先の MaxCompute プロジェクトが存在するリージョンです。 [プロジェクト] MaxCompute プロジェクトの名前です。DataWorks コンソールの Workspaces ページで確認できます。 [accessKeyId] MaxCompute への接続に使用するアカウントの AccessKey ID です。詳細は、「AccessKey ペアの取得」をご参照ください。 [accessSecret] MaxCompute への接続に使用するアカウントの AccessKey Secret です。詳細は、「AccessKey ペアの取得」をご参照ください。 「[接続性のテストと続行]」をクリックします。DTS は、自動的にそのサーバーの CIDR ブロックを Alibaba Cloud データベースインスタンスのホワイトリストおよび Elastic Compute Service (ECS) でホストされるデータベースのセキュリティグループルールに追加します。データセンターまたはサードパーティクラウド上の自己管理データベースの場合、DTS サーバーの CIDR ブロックを手動で追加します。詳細については、「DTS サーバーの CIDR ブロックをオンプレミスデータベースのセキュリティ設定に追加する」をご参照ください。
警告DTS サーバーの CIDR ブロックをホワイトリストまたはセキュリティグループに追加すると、セキュリティリスクが生じる可能性があります。続行する前に、認証情報の強化、公開ポートの制限、API 呼び出しの認証、ホワイトリストルールの定期監査などの予防措置を講じてください。あるいは、Express Connect、VPN Gateway、Smart Access Gateway を使用して接続することで、ホワイトリストの変更を回避できます。
OK をクリックして、MaxCompute アカウントに必要な権限を付与します。
ステップ 3:オブジェクトの選択と同期オプションの設定
以下のパラメーターを設定します。
| パラメーター | 説明 |
|---|---|
| 同期タイプ | [スキーマ同期]、[完全データ同期]、および [増分データ同期] を選択します。デフォルトでは、[増分データ同期] が選択されています。この場合、[スキーマ同期] と [完全データ同期] も選択する必要があります。DTS は、まず既存データ (増分同期のベースライン) を同期し、その後、変更を継続的に同期します。 |
| 増分データテーブルのパーティション定義 | 増分データテーブルのパーティション名を選択します。デフォルトのパーティションキーフィールドは、modifytime_year、modifytime_month、modifytime_day、modifytime_hour、および modifytime_minute です。詳細については、「パーティション」をご参照ください。 |
| 競合するテーブルの処理モード | [事前チェックとエラー報告] (デフォルト):宛先テーブルがソーステーブルと同じ名前を持つ場合、事前チェックは失敗します。宛先テーブルを上書きしてはならない場合に使用します。名前が競合し、宛先テーブルを削除または名前変更できない場合は、オブジェクト名マッピングを使用して同期対象のテーブルの名前を変更します。詳細については、「オブジェクト名のマッピング」をご参照ください。[エラーを無視して続行]:名前競合チェックをスキップします。スキーマが一致し、レコードのプライマリキーまたは一意キーがすでに送信先に存在する場合、DTS は完全同期中には既存のレコードを保持し、増分同期中にはそれを上書きします。スキーマが異なる場合、同期が失敗したり、一部の列しか書き込まれなかったりする可能性があるため、注意して進めてください。 |
| 追加列の命名ルール | DTS は各宛先テーブルにメタデータ列を追加します。これらが既存の列名と競合する場合、同期は失敗します。ご利用のセットアップに基づいて [新しいルール] または [以前のルール] を選択します。このパラメーターを設定する前に、名前の競合を確認してください。詳細については、「追加列の命名ルール」をご参照ください。 |
| 宛先インスタンスのオブジェクト名の大文字/小文字の区別 | MaxCompute のデータベース名、テーブル名、列名の大文字/小文字の区別をコントロールします。デフォルト: [DTS デフォルトポリシー]。詳細については、「宛先インスタンスのオブジェクト名の大文字/小文字の区別を指定」をご参照ください。 |
| ソースオブジェクト | オブジェクトを選択し、右矢印アイコンをクリックして [選択したオブジェクト] に移動します。オブジェクトとしてテーブルを選択してください。DTS はビュー、トリガー、ストアドプロシージャを同期しません。 |
| 選択したオブジェクト | オブジェクトを右クリックして名前を変更したり、SQL 操作をフィルターしたりします。一度に複数のオブジェクトの名前を変更するには、[一括編集] をクリックします。条件によってデータをフィルターするには、右クリックして WHERE 条件を指定します。詳細については、「オブジェクト名のマッピング」および「フィルター条件の指定」をご参照ください。 |
ステップ 4:高度な設定の構成
次へ:高度な設定 をクリックし、以下の設定を行います。
| パラメーター | 説明 |
|---|---|
| モニタリングとアラート | タスクが失敗した場合や同期遅延がしきい値を超えた場合にアラートを受け取るには、はいモニタリングとアラートの設定 を選択します。アラートのしきい値および通知設定を構成します。「」をご参照ください。いいえ を選択すると、アラート機能をスキップします。 |
| 失敗した接続のリトライ時間 | タスク開始後に DTS が失敗した接続をリトライする期間です。有効な値:10~1,440 分。デフォルト:720 分。少なくとも 30 分に設定してください。複数のタスクが同じソースまたは宛先を共有する場合、最も短いリトライ期間がすべてのタスクに適用されます。リトライ中は、インスタンスに対して課金されます。 |
| ETL の構成 | ETL ルールの設定とデータ処理文の入力を行うには、[はい] を選択します。ETL をスキップするには、[いいえ] を選択します。「ETL とは何か? |
ステップ 5:事前チェックの実行
次へ:タスク設定の保存と事前チェック をクリックします。保存前にこのタスクの OpenAPI パラメーターをプレビューするには、ボタンにカーソルを合わせて OpenAPI パラメーターのプレビュー をクリックします。
事前チェックが完了するまで待ちます。
項目が失敗した場合、詳細の表示 をクリックして問題を修正し、その後 再事前チェック をクリックします。
無視できないアラートが発生した場合、問題を修正して事前チェックを再実行します。
無視可能なアラート項目の場合、アラートの詳細の確認 > 無視 > OK をクリックし、その後 再事前チェック をクリックします。アラートを無視すると、データの不整合が発生する可能性があります。
成功確率 が 100% になるまで待ち、その後 次へ:インスタンスの購入 をクリックします。
ステップ 6:インスタンスの購入
購入ページで以下の設定を行います。
| パラメーター | 説明 |
|---|---|
| 課金方法 | サブスクリプション:固定期間分を前払いする方式で、長期利用の場合にコスト効率が高くなります。従量課金:1 時間単位で課金される方式で、短期利用や試用に適しています。不要になったインスタンスは速やかにリリースし、継続的な課金を回避してください。 |
| リソースグループ | このインスタンスに割り当てるリソースグループです。デフォルト:default resource group。詳細については、「What is Resource Management? |
| インスタンスクラス | 同期スループットの階層です。詳細については、「Instance classes of data synchronization instances」をご参照ください。 |
| サブスクリプション期間 | (サブスクリプション課金のみ)契約期間:1~9 か月、または 1 年、2 年、3 年、5 年から選択できます。 |
Data Transmission Service(従量課金)サービス利用規約 をお読みになり、チェックボックスをオンにして、購入して開始 > OK をクリックします。
タスクはタスクリストに表示されます。進捗状況はそこから監視できます。
増分データテーブルのスキーマ
MaxCompute プロジェクトで全表スキャンを許可するには、set odps.sql.allow.fullscan=true; を実行します。DTS は PolarDB-X 2.0 からの増分変更を MaxCompute の増分データテーブルに書き込みます。ソーステーブルの列に加えて、DTS はすべての増分データテーブルに以下のメタデータ列を追加します。

この例では、modifytime_year、modifytime_month、modifytime_day、modifytime_hour、modifytime_minuteが、ステップ 3 で設定された通りパーティションキーを構成します。
| 列 | 説明 |
|---|---|
record_id | ログエントリの固有 ID。新しいエントリごとに自動インクリメントされます。UPDATE 操作では、DTS が同じ record_id を持つ 2 つのエントリを生成します。1 つは更新前の値、もう 1 つは更新後の値です。 |
operation_flag | 操作タイプ:I(INSERT)、U(UPDATE)、D(DELETE)。 |
utc_timestamp | バイナリログから取得した操作の協定世界時(UTC)タイムスタンプです。 |
before_flag | Y、それ以外の場合は N。 |
after_flag | Y、それ以外の場合は N。 |
before_flag および after_flag の動作
before_flag および after_flag の値は、操作タイプによって異なります。
INSERT:行には新しく挿入された値が格納されます。
before_flag = N、after_flag = Y。
UPDATE:DTS は同じ
record_id、operation_flag、utc_timestampを持つ 2 つのログエントリを生成します。最初のエントリには更新前の値が格納され(before_flag = Y、after_flag = N)、2 番目のエントリには更新後の値が格納されます(before_flag = N、after_flag = Y)。
DELETE:行には削除されたレコードの値が格納されます。
before_flag = Y、after_flag = N。
全量ベースラインテーブルと増分データテーブルのマージ
同期タスクが開始されると、DTS は MaxCompute 内に全量ベースラインテーブルおよび増分データテーブルを作成します。SQL を使用して 2 つのテーブルをマージし、任意の時点における完全なデータセットを再構築します。
以下の例では、customer という名前のテーブルのデータをマージします。

ソーステーブルと同じスキーマを持つ宛先テーブルを作成し、マージ結果を格納します。以下の例では、タイムスタンプ
1565944878時点のcustomerデータを格納するテーブルを作成します。CREATE TABLE `customer_1565944878` ( `id` bigint NULL, `register_time` datetime NULL, `address` string);アドホッククエリ機能を使用して、MaxCompute で SQL ステートメントを実行します。詳細については、「SQL ステートメントを実行するためのアドホッククエリ機能の使用」をご参照ください。サポートされているデータの型については、「データの型のエディション」をご参照ください。
以下の SQL を実行して、全量ベースラインテーブルと増分データテーブルをマージします。プレースホルダーを実際の値に置き換えてください。
set odps.sql.allow.fullscan=true; insert overwrite table <result_storage_table> select <col1>, <col2>, <colN> from( select row_number() over(partition by t.<primary_key_column> order by record_id desc, after_flag desc) as row_number, record_id, operation_flag, after_flag, <col1>, <col2>, <colN> from( select incr.record_id, incr.operation_flag, incr.after_flag, incr.<col1>, incr.<col2>, incr.<colN> from <table_log> incr where utc_timestamp< <timestamp> union all select 0 as record_id, 'I' as operation_flag, 'Y' as after_flag, base.<col1>, base.<col2>, base.<colN> from <table_base> base) t) gt where row_number=1 and after_flag='Y'以下のプレースホルダーを置き換えます。
プレースホルダー 説明 <result_storage_table>マージされたデータを格納するテーブル <col1>、<col2>、<colN>マージ対象テーブルの列名 <primary_key_column>マージ対象テーブルのプライマリキー列 <table_log>増分データテーブルの名前 <table_base>全量ベースラインテーブルの名前 <timestamp>マージを行う時点のタイムスタンプ 以下の例では、タイムスタンプ
1565944878時点の完全なcustomerデータセットを取得します。set odps.sql.allow.fullscan=true; insert overwrite table customer_1565944878 select id, register_time, address from( select row_number() over(partition by t.id order by record_id desc, after_flag desc) as row_number, record_id, operation_flag, after_flag, id, register_time, address from( select incr.record_id, incr.operation_flag, incr.after_flag, incr.id, incr.register_time, incr.address from customer_log incr where utc_timestamp< 1565944878 union all select 0 as record_id, 'I' as operation_flag, 'Y' as after_flag, base.id, base.register_time, base.address from customer_base base) t) gt where gt.row_number= 1 and gt.after_flag= 'Y';customer_1565944878からマージされたデータをクエリします。