Data Integration は、ApsaraDB for OceanBase、MySQL、Oracle、PolarDB などのソースから MaxCompute への完全同期と増分同期 (ほぼリアルタイム) をサポートしています。このソリューションは、完全なデータ移行とリアルタイムの増分同期を統合し、T+1 ベースで宛先のデータをマージします。このトピックでは、MySQL をソース、MaxCompute を宛先として使用して、完全同期と増分同期 (ほぼリアルタイム) タスクを作成する方法について説明します。
仕組み
完全同期と増分同期タスクは、初回の完全なデータ移行と継続的な増分同期のために統一されたプロセスを採用しています。タスクが開始されると、システムは自動的にバッチサブタスクとリアルタイムサブタスクを作成および調整し、データをターゲットテーブル (Base テーブル) にマージして書き込みます。
主要なプロセスは、次の 3 つのフェーズで構成されます。
完全初期化:起動時に、バッチ同期サブタスクがすべてのソーステーブルからテーブルスキーマと既存データを MaxCompute のターゲット Base テーブルに移行します。完全なデータ初期化が完了すると、このバッチ同期タスクは凍結されます。
増分データ同期:完全移行が完了した後、リアルタイム同期サブタスクがソースデータベースからの増分変更 (Insert、Update、Delete) を (例えば、MySQL のバイナリログ経由で) 継続的にキャプチャし、ほぼリアルタイムで MaxCompute の一時的な Log テーブルに書き込みます。
定期的なマージ (Merge):毎日 (T+1) のマージタスクは、前日 (T) に Log テーブルに蓄積された増分データを Base テーブルの完全データと結合します。これにより、T 日の最新の完全スナップショットデータが生成され、Base テーブルの新しいパーティションに書き込まれます。マージタスクは 1 日に 1 回実行されます。
パーティションテーブルを例にとると、データフローは次のようになります。
この同期タスクには、次の特徴があります。
複数テーブルから複数テーブル/単一テーブルへ:マッピングルールを使用して、複数のソーステーブルを対応するターゲットテーブルに同期したり、複数のソーステーブルから単一のターゲットテーブルにデータをマージしたりすることをサポートします。
タスクコンポーネント:データベース全体の同期タスクは、完全初期化のためのバッチ同期サブタスク、増分同期のためのリアルタイム同期サブタスク、およびデータを統合するためのマージタスクで構成されます。
ターゲットテーブルのサポート:MaxCompute のパーティションテーブルと非パーティションテーブルの両方へのデータ書き込みをサポートします。
注意事項
リソース要件:タスクには、サーバーレスリソースグループまたはデータ統合専用リソースグループが必要です。インスタンスモードで同期する場合、データ統合専用リソースグループの最小リソース仕様は 8 vCPU と 16 GB、サーバーレスリソースグループの場合は 2 CU です。
ネットワーク接続:Data Integration リソースグループと、ソース (例:MySQL) およびターゲット (例:MaxCompute) の両方のデータソースとの間のネットワーク接続を確保してください。詳細については、「ネットワーク接続ソリューションの概要」をご参照ください。
リージョン制限:同期は、現在の DataWorks ワークスペースと同じリージョンにある自己管理の MaxCompute データソースに対してのみサポートされます。自己管理の MaxCompute データソースを使用する場合、DataWorks の DataStudio で MaxCompute コンピューティングリソースをバインドする必要があります。そうしないと、MaxCompute SQL ノードを作成できず、完全同期の「完了」ノードの作成が失敗します。
スケジューリングリソースグループの制限:バッチ完全同期サブタスクには、設定済みの [リソースグループ] が必要です。スケジューリング用の共有リソースグループはサポートされていません。
ターゲットテーブルタイプの制限:MaxCompute 外部テーブルへの同期はサポートされていません。
留意事項
プライマリキー要件:プライマリキーのないテーブルはサポートされていません。設定中に 1 つ以上の列をビジネスプライマリキー ([カスタムプライマリキー]) として手動で指定する必要があります。
AccessKey (AK) の有効性:同期に一時的な AccessKey (AK) を使用すると、AK が期限切れ (7 日後に自動的に失効) になったときにタスクは失敗します。プラットフォームは、一時的な AK が原因で障害が検出されると、タスクを自動的に再起動します。このタスクタイプに監視が設定されている場合、アラートが届きます。
データ可視性の遅延:設定した日には、MaxCompute への完全同期と増分同期 (ほぼリアルタイム) タスクの既存の完全データのみをクエリできます。増分データは、翌日にマージタスクが完了した後に MaxCompute で利用可能になります。詳細については、「仕組み」のデータ書き込みセクションをご参照ください。
ストレージとライフサイクル:完全同期と増分同期 (ほぼリアルタイム) タスクは、毎日完全なパーティションを生成します。ストレージコストを管理するため、タスクによって自動的に作成される MaxCompute テーブルのデフォルトのライフサイクルは 30 日です。これがビジネス要件を満たさない場合は、タスク設定中に対応する MaxCompute テーブル名をクリックしてライフサイクルを変更できます。詳細については、「ターゲットテーブルスキーマの編集 (オプション)」をご参照ください。
SLA:Data Integration は、データのアップロードとダウンロードに MaxCompute エンジンの同期データトンネルを使用します (SLA の詳細については、「データアップロードのシナリオとツール」をご参照ください)。MaxCompute エンジンの同期データトンネルの SLA に基づいて技術的な選択を評価してください。
Binlog 保持ポリシー:リアルタイム同期は、ソース MySQL データベースのバイナリログ (binlog) に依存します。長時間の停止や障害リトライ中に開始オフセットが見つからないことによる同期の中断を防ぐため、binlog の保持期間が十分であることを確認してください。
課金
完全同期と増分同期タスクは、完全フェーズのバッチ同期タスク、増分フェーズのリアルタイム同期タスク、および定期マージフェーズの定期タスクで構成されます。課金は、これら 3 つのフェーズそれぞれに個別に適用されます。3 つのフェーズすべてがリソースグループの CU を消費します (「サーバーレスリソースグループの課金」をご参照ください)。定期タスクには、タスクスケジューリング料金も発生します (「スケジューリングインスタンス料金」をご参照ください)。
さらに、MaxCompute への完全同期と増分同期リンクは、完全データと増分データの定期的なマージのために MaxCompute コンピューティングリソースを消費します。これらの料金は、同期された完全データのサイズとマージサイクルに基づいて MaxCompute によって直接請求されます。具体的な料金については、「課金項目と課金方法」をご参照ください。
操作手順
ステップ 1:同期タイプの選択
Data Integration ページに移動します。
DataWorks コンソールにログインします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、 を選択します。表示されたページで、ドロップダウンリストから目的のワークスペースを選択し、[Data Integration へ移動] をクリックします。
左側のナビゲーションウィンドウで、Synchronization Task をクリックします。次に、ページの上部にある Create Synchronization Task をクリックします。[同期タスクの作成] ダイアログボックスで、次の主要な情報を設定します。
Source Type:
MySQLDestination Type:
MaxCompute。Name:カスタムの同期タスク名を入力します。
Task Type:
全ウェアハウスの完全増分Sync Procedure:[構造移行]、[増分同期]、[完全初期化]、および [サイクルマージ]
ステップ 2:ネットワーク接続の設定
[ネットワークとリソースの構成] セクションで、同期タスクに使用する [リソースグループ] を選択します。[タスクリソース使用量] セクションで、完全同期と増分同期に CU を個別に割り当てて、リソース使用率を最適化できます。
[ソース] には追加した
MySQLデータソースを、[デスティネーション] には追加したMaxComputeデータソースを選択し、[接続テスト] をクリックします。
ソースとターゲットの両方のデータソースで接続が成功したことを確認し、[次へ] をクリックします。
ステップ 3:テーブルの選択
[ソーステーブル] エリアで、ソースデータソースから同期するテーブルを選択します。
アイコンをクリックして、テーブルを [選択したテーブル] リストに移動します。

テーブルの数が多い場合は、[データベースフィルタリング] または [テーブルの検索] を使用して、正規表現でテーブルを選択します。
ステップ 4:タスク設定の構成
Log テーブルの時間範囲:このパラメーターは、Log テーブルからターゲットパーティションにデータをマージする際のクエリ時間範囲を定義します。
この範囲を適切に拡張して、パーティションに属するすべてのデータが正しくマージされるようにし、データ遅延による日をまたぐパーティションエラーを防ぎます。
マージタスクのスケジューリング:毎日のマージタスクのスケジューリング時間を設定します。スケジューリング時間の設定の詳細については、「スケジュール時間」をご参照ください。
定期スケジューリングパラメーター:スケジューリングパラメーターを設定します。これらのパラメーターは、後でパーティション設定でパーティションに値を割り当て、日付ごとにパーティションに書き込む要件を満たすために使用できます。
テーブルパーティション設定:パーティション列名や割り当て方法など、ターゲットテーブルのパーティションを設定します。割り当て列は、スケジューリングパラメーターを使用して、日付ごとにパーティションを自動的に生成できます。
ステップ 5:ターゲットテーブルのマッピング
このステップでは、ソーステーブルとターゲットテーブル間のマッピングルールを定義し、プライマリキー、動的パーティション、DDL/DML 構成などのルールを指定して、データの書き込み方法を決定します。
操作 | 説明 | ||||||||||||
マッピング結果の更新 | システムは選択されたソーステーブルをリスト表示します。ターゲットテーブルの属性は、更新して確認した後にのみ有効になります。
| ||||||||||||
[宛先テーブル名のマッピングルールのカスタマイズ] (オプション) | システムはデフォルトのテーブル名生成ルール
次の操作が可能です。
| ||||||||||||
フィールドデータ型のマッピングの編集 (オプション) | システムは、ソースとターゲットのフィールドタイプのデフォルトマッピングを提供します。テーブルの右上隅にある [フィールドデータ型のマッピングの編集] をクリックして、ソーステーブルとターゲットテーブル間のフィールドタイプのマッピング関係をカスタマイズし、[適用してマッピングを更新] をクリックします。 フィールドタイプのマッピングを編集する際は、フィールドタイプの変換ルールが正しいことを確認してください。そうしないと、タイプ変換の失敗が発生し、ダーティデータが発生してタスクの実行に影響を与える可能性があります。 | ||||||||||||
宛先テーブル構造の編集 (オプション) | システムは、カスタムテーブル名マッピングルールに基づいて、存在しないターゲットテーブルを自動的に作成するか、同じ名前の既存のテーブルを再利用します。 DataWorks は、ソーステーブルスキーマに基づいてターゲットテーブルスキーマを自動的に生成します。標準的なシナリオでは、手動での介入は必要ありません。次の方法でテーブルスキーマを変更することもできます。
既存のテーブルの場合、フィールドの追加のみが可能です。新しいテーブルの場合、フィールド、パーティションフィールドの追加、およびテーブルタイプやプロパティの設定ができます。詳細については、インターフェイスの編集エリアをご参照ください。 | ||||||||||||
宛先テーブル列の割り当て | ネイティブフィールドは、ソーステーブルとターゲットテーブルで一致するフィールド名に基づいて自動的にマッピングされます。前のステップの [追加されたフィールド] は手動で割り当てる必要があります。手順は次のとおりです。
定数または変数を割り当てることができます。[割り当て方法] でタイプを切り替えます。サポートされている方法は次のとおりです。 テーブルフィールド
| ||||||||||||
ソース分割PK | ソース分割キーのドロップダウンリストからソーステーブルのフィールドを選択するか、[分割なし] を選択できます。同期中、タスクはこのフィールドに基づいて複数のサブタスクに分割され、同時かつバッチでのデータ読み取りが可能になります。 プライマリキーまたはデータ分布が均等なフィールドをソース分割キーとして使用することを推奨します。文字列、浮動小数点、日付型はサポートされていません。 現在、ソース分割キーは MySQL ソースでのみサポートされています。 | ||||||||||||
完全同期の実行 | ステップ 3 で完全同期が設定されている場合、特定のテーブルに対して個別に完全データ同期の選択を解除できます。これは、完全データが他の方法ですでにターゲットに同期されているシナリオに適用されます。 | ||||||||||||
完全同期条件 | 完全同期フェーズ中にソースデータをフィルタリングします。WHERE キーワードを除いた WHERE 句を入力します。 | ||||||||||||
[DML ルール] | DML メッセージ処理は、ソースからキャプチャされた変更データ ( | ||||||||||||
フルデータマージサイクル | 現在、日次マージのみをサポートしています。 | ||||||||||||
プライマリキーのマージ | テーブルから 1 つ以上の列を選択してプライマリキーを定義できます。
|
ステップ 6:DDL 機能の設定
特定のリアルタイム同期タスクは、ソーステーブル構造のメタデータ変更を検出し、更新を同期したり、アラート、無視、または実行の終了などの他のアクションを実行したりします。
インターフェイスの右上隅にある [DDL 機能の設定] をクリックして、各変更タイプの処理ポリシーを設定します。サポートされているポリシーはチャネルによって異なります。
通常処理:宛先はソースからの DDL 変更情報を処理します。
無視:変更メッセージは無視され、宛先では変更は行われません。
エラー:データベース全体のリアルタイム同期タスクが終了し、ステータスが [エラー] に設定されます。
アラート:ソースでこのような変更が発生した場合、ユーザーにアラートが送信されます。Configure Alert Rule で DDL 通知ルールを設定する必要があります。
DDL 同期によってソース列が宛先に追加された場合、既存のレコードには新しい列のデータがバックフィルされません。
ステップ 7:その他の設定
アラーム設定
1. アラームの追加

(1) Create Rule をクリックしてアラームルールを設定します。
Alert Reason を設定して、Business delay、[フェイルオーバー]、Task status、DDL Notification、Task Resource Utilization などのタスクのメトリックを監視します。指定されたしきい値に基づいて、CRITICAL または WARNING のアラームレベルを設定できます。
Configure Advanced Parameters を設定することで、アラームメッセージの送信間隔を制御し、アラート疲れやメッセージの滞留を防ぐことができます。
アラーム理由として Business delay、Task status、または Task Resource Utilization を選択した場合、タスクが正常に戻ったときに受信者に通知する回復通知を有効にすることもできます。
(2) アラームルールの管理
作成したアラームルールについては、アラームスイッチを使用してアラームルールが有効かどうかを制御できます。アラームレベルに基づいて、特定のアラーム受信者にアラームを送信します。
2. アラームの表示
タスクリストの を展開してアラームイベントページに入り、発生したアラーム情報を表示します。
リソースグループの設定
インターフェイスの右上隅にある [リソースグループの設定] パネルで、タスクが使用するリソースグループとその設定を管理できます。
1. リソースグループの表示と切り替え
[リソースグループの設定] をクリックして、現在タスクにバインドされているリソースグループを表示します。
リソースグループを変更するには、ここで別の利用可能なリソースグループに切り替えます。
2. リソースの調整と「リソース不足」エラーのトラブルシューティング
タスクログに
Please confirm whether there are enough resources...のようなメッセージが表示された場合、現在のリソースグループの利用可能なコンピューティングユニット (CU) がタスクの開始または実行に不足しています。[リソースグループの設定] パネルでタスクが占有する CU 数を増やして、より多くのコンピューティングリソースを割り当てることができます。
推奨されるリソース設定については、「Data Integration の推奨 CU」をご参照ください。実際状況に基づいて設定を調整してください。
DataWorks のバッチ同期タスクは、スケジューリングリソースグループによって Data Integration タスク実行リソースグループにディスパッチされて実行されます。したがって、バッチ同期タスクは、Data Integration タスク実行リソースグループを使用するだけでなく、スケジューリングリソースグループのリソースも消費し、スケジューリングインスタンス料金が発生します。
詳細パラメーター設定
カスタム同期要件については、[詳細設定] 列の [設定] をクリックして詳細パラメーターを変更します。
インターフェイスの右上隅にある [詳細設定] をクリックして、詳細パラメーター設定ページに入ります。
プロンプトに従ってパラメーター値を変更します。各パラメーターの意味は、パラメーター名の後に説明されています。
タスクの遅延、他のタスクをブロックする過剰なリソース消費、またはデータ損失などの問題を避けるため、変更前にパラメーターを十分に理解してください。
ステップ 8:同期タスクの実行
設定が完了したら、[保存] または Complete をクリックしてタスクを保存します。
で、作成した同期タスクを見つけ、Operation 列の Deploy をクリックします。表示されるダイアログボックスで [デプロイ後すぐに開始] を選択し、[確認] をクリックすると、タスクはすぐに実行されます。そうでない場合は、手動でタスクを開始する必要があります。
説明Data Integration タスクは、実行する前に本番環境にデプロイする必要があります。したがって、変更を有効にするには、新しいタスクまたは変更されたタスクをデプロイする必要があります。
Tasks のタスクの Name/ID をクリックして、実行の詳細を表示します。
次のステップ
タスクを設定した後、タスクの管理、テーブルの追加または削除、監視とアラートの設定、および主要なタスク実行メトリックの表示ができます。詳細については、「完全同期と増分同期タスクの運用保守」をご参照ください。
よくある質問
Q:Base テーブルのデータが期待どおりに更新されないのはなぜですか?
A:次の原因と解決策をご参照ください。
症状 | 原因 | 解決策 |
増分 Log テーブルの T-1 パーティションデータの検証に失敗しました。 | リアルタイム同期タスクで例外が発生し、増分 Log テーブルの T-1 パーティションデータが正しく生成されませんでした。 |
|
ターゲット Base テーブルの T-2 パーティションデータの検証に失敗しました。 |
|
|
ボタンをクリックし、[手動入力] または [組み込み変数] を選択して連結することで、ターゲットテーブル名を生成できます。サポートされている変数には、ソースデータソース名、ソースデータベース名、ソーステーブル名が含まれます。





ボタンをクリックして、フィールドを追加します。
ツールチップで特定の変数の意味を確認できます。