データベース全体のリアルタイム同期機能は、一度限りのフル同期と継続的な増分キャプチャを組み合わせて、MySQL や Oracle などのソースデータベース全体を低遅延で送信先システムに同期します。データベース全体のリアルタイム同期タスクは、まず既存データのフル同期を実行し、送信先にスキーマとデータを自動的に初期化します。その後、リアルタイム増分モードにシームレスに切り替わり、Change Data Capture (CDC) などの技術を使用して、以降のデータ変更を継続的にキャプチャおよび同期します。この機能は、リアルタイムデータウェアハウスやデータレイク構築などのユースケースに最適です。本トピックでは、MySQL データベースを MaxCompute に同期する例を用いて、タスクの設定手順を説明します。
前提条件
データソースの準備
ソースおよび送信先のデータソースを作成済みである必要があります。詳細については、「データソース管理」をご参照ください。
データソースがデータベース全体のリアルタイム同期をサポートしていることを確認してください。詳細については、「サポートされるデータソースと同期ソリューション」をご参照ください。
MySQL、Hologres、Oracle などの一部のデータソースでは、ログを有効にする必要があります。ログの有効化方法はデータソースによって異なります。詳細については、「データソース一覧」をご参照ください。
MaxCompute:Decimal データ型は MaxCompute 2.0 のみでサポートされます。同期を開始する前に、MaxCompute 2.0 データ型を有効にしてください。詳細については、「MaxCompute 2.0 データ型」をご参照ください。
リソースグループ:サーバーレスリソースグループを購入および設定します。
ネットワーク接続:リソースグループとデータソース間のネットワーク接続を設定します。
注意事項
DataWorks では、データベース同期に次の 2 種類があります。データベース全体のリアルタイム同期およびデータベース全体のフル&増分(ニアリアルタイム)。どちらのタイプも、既存データのフル同期を実行した後、自動的に増分モードに切り替わります。ただし、遅延時間と送信先テーブルの要件が異なります。
遅延時間:データベース全体のリアルタイム同期は秒~分単位の遅延を実現します。データベース全体のフル&増分(ニアリアルタイム)は T+1 の遅延を提供します。
送信先テーブル (MaxCompute):
PK Delta Table:データベース全体のリアルタイム同期のすべての機能をサポートします。
Standard Table および Append Delta Table:データベース全体のリアルタイム同期タスクで「増分のみ同期」モードを選択した場合、Append モードのみをサポートします。
データベース全体のフル&増分(ニアリアルタイム):上記のすべてのテーブルタイプをサポートします。
データベース全体のリアルタイム同期タスクは、DataStudio または Data Integration のいずれかで設定できます。両モジュールの機能は同一です。
設定の一貫性:タスク作成場所に関係なく、設定インターフェイス、パラメータ設定、基盤機能は完全に同一です。
双方向同期:Data Integration モジュールで作成されたタスクは、自動的に Data Studio モジュールの
data_integration_jobsディレクトリに同期および表示されます。これらのタスクは、ソースタイプ-送信先タイプ形式に基づいてチャンネル別に分類され、一元管理が容易になります。
タスクの設定
ステップ 1:同期タスクの作成
DataWorks コンソールにログインします。上部ナビゲーションバーで目的のリージョンを選択し、左側ナビゲーションウィンドウで を選択します。表示されたページでドロップダウンリストから目的のワークスペースを選択し、Data Integration へ移動 をクリックします。
左側ナビゲーションウィンドウで Synchronization Task をクリックします。次に、ページ上部の Create Synchronization Task をクリックし、以下のタスク情報を設定します。
Source Type:
MySQL。Destination Type:
MaxCompute。Specific Type:
データベース全体のリアルタイム同期。Synchronization Mode:
Schema Migration:データを含まない状態で、ソースと一致するテーブル、フィールド、データ型などのデータベースオブジェクトを送信先に自動作成します。
Full Synchronization(オプション):テーブルなどの指定されたソースオブジェクトからすべての既存データを一度だけ完全にコピーして送信先に転送します。通常、初期データ移行または初期化に使用されます。
Incremental Sync(オプション):フル同期完了後、ソースからのデータ変更(挿入、更新、削除)を継続的にキャプチャおよび同期します。
ステップ 2:データソースとリソースの設定
Source Data Sourceセクションで、
MySQLデータソースを選択します。Destinationセクションで、MaxComputeデータソースを選択します。Running Resourcesセクションで、同期タスクのResource Groupを選択し、タスクにResource GroupCUを割り当てます。
説明タスクリソースが不足している旨のメッセージ(例:
Please confirm whether there are enough resources...)がタスクリソースログに表示された場合、現在のリソースグループ内の利用可能なコンピューティングユニット (CU) がタスクの開始または実行に不十分であることを意味します。Configure Resource Groupパネルでタスクが使用する CU 数を増やすことで、より多くのコンピューティングリソースを割り当てることができます。推奨リソース設定については、「Data Integration の推奨 CU 数」をご参照ください。実際のワークロードに基づいて設定を調整してください。
ソースおよび送信先の両方のデータソースが接続チェックに合格していることを確認します。
ステップ 3:同期ソリューションの設定
1. データソースの設定
このステップでは、ソーステーブルリストから同期するテーブルを選択し、
アイコンをクリックして選択済みテーブルリストに移動させます。多数のテーブルがある場合は、Database FilteringまたはTable filteringを使用して正規表現を設定し、テーブルを選択できます。
同じ構造を持つ複数のシャード化されたデータベースおよびテーブルからデータを単一の送信先テーブルに書き込むには、正規表現を使用してテーブルを選択できます。

ソーステーブル設定に正規表現を入力すると、DataWorks は一致するすべてのソーステーブルを自動的に識別し、そのデータを式に対応する送信先テーブルに書き込みます。説明この方法は、シャード化されたテーブルのマージおよび同期に適しており、複数の多対一同期ルールを個別に追加する必要がなくなるため、設定効率が向上します。
2. データ送信先の設定
データベース全体のリアルタイム同期タスクでIncremental Syncのみを選択した場合、送信先テーブルへの書き込みについて増分同期モードを設定できます。
Replay:このモードは PK Delta Table のみでサポートされます。通常の同期と同様に、データフィールドのみを同期します。
Incremental stream:このモードは Standard Table および Append Delta Table でサポートされます。ソーステーブルからのリアルタイムデータに加え、挿入、更新、削除などのメタデータを送信先テーブルに追記します。増分ストリーム形式の詳細については、「付録:増分ストリームテーブル形式」をご参照ください。
3. 送信先テーブルのマッピング
このステップでは、ソーステーブルと送信先テーブルのマッピングルールを定義し、プライマリキー、動的パーティション、DDL/DML 設定のルールを指定して、データの書き込み方法を決定します。
操作 | 説明 | ||||||||||||
Refresh | システムは選択したソーステーブルを自動的に一覧表示しますが、送信先テーブルの具体的な属性は、マッピングを更新して確定した後に有効になります。
| ||||||||||||
Customize Mapping Rules for Destination Table Names(オプション) | システムにはデフォルトのテーブル名生成ルールがあります:
この機能は以下のシナリオをサポートします。
| ||||||||||||
フィールド型マッピングの編集(オプション) | システムはソースと送信先のフィールド型のデフォルトマッピングを提供します。テーブル右上隅のEdit Mapping of Field Data Typesをクリックして、マッピング関係をカスタマイズできます。設定後、Apply and Refresh Mappingをクリックします。 フィールド型マッピングを編集する際は、型変換ルールが正しいことを確認してください。そうでないと、型変換が失敗し、ダーティデータやタスク中断を引き起こす可能性があります。 | ||||||||||||
送信先テーブル構造の編集(オプション) | カスタムテーブル名マッピングルールに基づき、システムは新しい送信先テーブルを作成するか、同じ名前の既存テーブルを再利用します。 DataWorks はソーススキーマに基づいて送信先スキーマを生成します。通常、手動での介入は不要です。ただし、以下の方法でテーブルスキーマを変更することもできます。
既存テーブルの場合、フィールドの追加のみが可能です。新規テーブルの場合、フィールド、パーティションフィールドの追加、テーブルタイプまたはプロパティの設定が可能です。詳細については、UI の編集可能領域をご確認ください。 | ||||||||||||
Value assignment | ネイティブフィールドは、ソーステーブルと送信先テーブルで同じ名前のフィールドに基づいて自動的にマッピングされます。新たに追加されたフィールドおよびパーティションフィールドについては、手動で値を割り当てる必要があります。以下の操作を実行します。
Value Typeでタイプを切り替えることで、定数および変数を割り当てることができます。以下の方法がサポートされています。
説明 パーティションを過剰に作成すると、同期効率に悪影響を及ぼす可能性があります。1 日に 1,000 個を超える新規パーティションが作成された場合、パーティション作成は失敗し、タスクは終了します。したがって、パーティションフィールドの割り当て方法を定義する際は、潜在的なパーティション数を事前に見積もる必要があります。秒またはミリ秒単位でのパーティション作成は慎重に行ってください。 | ||||||||||||
ソース分割カラム | ソース分割カラムのドロップダウンリストからソーステーブルのフィールドを選択するか、Not Splitを選択します。実行中、同期タスクはこのフィールドに基づいて複数のサブタスクに分割され、同時実行によるバッチ読み取りが可能になります。 テーブルのプライマリキーをソース分割カラムとして使用することを推奨します。文字列型、浮動小数点型、日付型はサポートされていません。 現在、ソース分割カラムは MySQL ソースでのみサポートされています。 | ||||||||||||
テーブルのフル同期をスキップ | タスクステップでフル同期を設定している場合、特定のテーブルについてスキップを選択できます。これは、フルデータがすでに他の手段で送信先に同期済みである場合に便利です。 | ||||||||||||
Full condition | これにより、フル同期フェーズ中にソースデータをフィルタリングできます。 | ||||||||||||
Configure DML Rule | DML メッセージ処理により、ソースからキャプチャされたデータ変更( | ||||||||||||
その他 | Table Type:MaxCompute は Standard Table、
Delta Table の詳細については、「Delta Table」をご参照ください。 |
ステップ 4:高度な設定
高度なパラメーター
カスタム同期要件に合わせてタスク設定を微調整する必要がある場合は、Advanced Parametersタブに移動して、高度なパラメーターを変更できます。
UI の右上隅にある 高度な設定 をクリックして、高度なパラメーター設定ページに移動します。
ツールチップに従ってパラメーター値を変更します。各パラメーターの意味は、その名前の後に説明されています。
AI 搭載の設定機能も使用できます。自然言語で変更指示(例:タスクの同時実行数の調整)を入力すると、AI モデルが推奨パラメーター値を生成します。実際のニーズに基づいて、これらの値を受け入れるかどうかを選択できます。

タスク遅延、他のタスクをブロックするほどの過剰なリソース消費、データ損失などの予期しない問題を回避するため、これらのパラメーターの意味を十分に理解した上で変更してください。
DDL 設定
一部のリアルタイム同期タスクでは、ソーステーブル構造のメタデータ変更を検出し、送信先に通知できます。送信先では、この更新を同期するか、アラートの送信、変更の無視、タスクの終了などの他のアクションを実行できます。
UI の右上隅にある Configure DDL Capability をクリックして、各タイプの変更に対する処理ポリシーを設定できます。サポートされるポリシーはチャンネルによって異なります。
通常処理:送信先がソースからの DDL 変更を処理します。
無視:変更メッセージを無視し、送信先では変更を行いません。
エラー:データベース全体のリアルタイム同期タスクが終了し、ステータスが エラー に設定されます。
アラート:このタイプの変更がソースで発生した際にユーザーにアラートを送信します。Configure Alert Rule設定で DDL 通知ルールを構成する必要があります。
DDL 同期で送信先に新しいカラムが追加された場合、システムは既存の行に対してそのカラムのデータをバックフィルしません。
ステップ 5:タスクのデプロイおよび実行
タスク設定後、Saveをクリックします。
データベース全体の同期タスクは直接デバッグできません。Operation Centerにデプロイして実行する必要があります。したがって、新規タスクの作成時でも既存タスクの編集時でも、変更を有効にするにはタスクをDeployする必要があります。
タスクをデプロイする際、Start immediately after deploymentを選択すると、タスクも同時に開始されます。選択しない場合、デプロイ完了後、ページに移動し、対象タスクの操作列で手動でタスクを開始します。
Tasks内の対応するタスクのName/IDをクリックして、詳細な実行プロセスを表示します。
ステップ 6:アラートの設定
1. アラートルールの追加
リストで、対応するリアルタイムフルデータベース同期タスクを見つけ、操作列のをクリックして、タスクのアラートポリシーを設定します。

(1) Create Ruleをクリックして、アラートルールを設定します。
Alert Reasonを設定することで、Business delay、フェイルオーバー、Task status、DDL Notification、Task Resource Utilizationなどのタスクメトリックを監視できます。指定されたしきい値に基づいて、CRITICAL または WARNING のアラートレベルを設定できます。
Configure Advanced Parametersを使用することで、アラート疲弊やメッセージのバックログを防ぐために、アラートメッセージの送信間隔を制御できます。
Business delay、Task status、またはTask Resource Utilizationをアラートトリガーとして選択した場合、タスクが正常な状態に戻った際に受信者に通知する復旧通知を有効にすることもできます。
(2) アラートルールの管理
作成済みのアラートルールについては、アラートスイッチを使用して有効化または無効化できます。また、アラートレベルに基づいて異なる担当者にアラートを送信することも可能です。
2. アラートの確認
タスクのをクリックして、アラートイベントページに移動し、トリガーされたアラートに関する情報を確認できます。
タスクの管理
タスクの編集
ページで、作成した同期タスクを見つけ、Operation列のMoreをクリックし、次にEditをクリックします。タスク情報を変更できます。手順はタスク設定時と同じです。
実行中ではないタスクの場合、設定を直接変更して保存し、その後タスクをオペレーションセンターにデプロイすることで変更が有効になります。
Runningのタスクの場合、Start immediately after deploymentを選択せずにタスクを編集およびデプロイすると、元の操作ボタンがApply Updatesに変わります。このボタンをクリックすることで、オペレーションセンターで変更が有効になります。
[更新の適用] をクリックすると、システムは「停止、デプロイ、再起動」の順序で変更を適用します。
新規テーブルの追加または既存テーブルへの切り替えの場合:
更新の適用時にチェックポイントを選択できません。確認をクリックすると、システムは新規テーブルに対して構造移行およびフル同期を実行します。完了後、新規テーブルの増分同期が元のテーブルとともに開始されます。
その他の情報を変更した場合:
更新の適用時にチェックポイントを選択できます。確認をクリックすると、タスクは指定されたチェックポイントから再開します。チェックポイントを指定しない場合、最後に停止した時点から再開します。
変更されていないテーブルは影響を受けません。更新および再起動後、最後に停止した時点から再開します。
タスクの確認
同期タスクを作成後、同期タスクページで作成済みタスクの一覧および基本情報を確認できます。

操作列で、同期タスクをStartまたは停止できます。その他メニューで、編集やViewなどの他の操作を実行できます。
開始済みのタスクについては、Execution Overviewで基本的な実行ステータスを確認できます。また、対応する概要エリアをクリックして、実行の詳細を表示することもできます。

ブレークポイントからの再開
ユースケース
チェックポイントを手動でリセットするのは、以下のシナリオで役立ちます。
タスク復旧およびデータ継続:タスクが中断された場合、中断時間を新しい開始チェックポイントとして手動で指定し、データ復旧の精度を確保する必要があります。
データ問題のトラブルシューティングおよびバックトラッキング:データが失われた、または異常が発生した場合、問題が発生する前の時点までチェックポイントをロールバックして、データをリプレイおよび修復します。
タスク設定の大幅な変更:送信先テーブル構造やフィールドマッピングなど、タスク設定を大幅に調整した後は、新しい設定でデータの精度を確保するために、特定の時点から同期を開始するようチェックポイントをリセットすることを推奨します。
手順
Startをクリックします。表示されるダイアログボックスで、Whether to reset the siteを選択できます。

最後に停止した時点から再開するには、リセットオプションをオフにしてタスクを直接実行します。タスクは最後に記録されたチェックポイントから継続します。
特定の時点から開始するには、リセットオプションをオンにして時間を選択します。タスクは指定された時間のチェックポイントから開始します。選択した時間がソースの binlog 保持期間内であることを確認してください。
同期タスクの実行時に「チェックポイントエラー」または「チェックポイントが存在しません」というメッセージが表示された場合は、以下の解決策をお試しください。
チェックポイントのリセット:リアルタイム同期タスクを開始する際に、チェックポイントをリセットし、ソースデータベースで最も早い利用可能なチェックポイントを選択します。
ログ保持期間の調整:データベースのチェックポイントが期限切れになっている場合、データベースのログ保持期間を 7 日間などに延長することを検討してください。
データ同期:データが失われている場合、再度フル同期を実行するか、オフライン同期タスクを作成して手動で不足しているデータを同期します。
よくある質問
データベース全体のリアルタイム同期に関するよくある質問については、「リアルタイム同期に関するよくある質問」および「フルおよび増分同期タスクに関するよくある質問」をご参照ください。
付録:増分ストリーム形式
フラット化されたソーステーブルフィールド
フィールド名 | 説明 |
sequence_id | 増分イベントのレコード ID です。値は一意かつ増分的です。 |
operation_type | 操作タイプ (I/D/U) です。 |
execute_time | データに対応するタイムスタンプです。 |
before_image | 変更前のイメージかどうかを示します (Y/N)。 |
after_image | 変更後のイメージかどうかを示します (Y/N)。 |
src_datasource | ソースデータソースです。 |
src_database | ソースデータベースです。 |
src_table | ソーステーブルです。 |
フィールド 1 | 実際のデータフィールド 1 です。 |
フィールド 2 | 実際のデータフィールド 2 です。 |
フィールド 3 | 実際のデータフィールド 3 です。 |
JSON にマージされたソースフィールド
フィールド名 | 説明 |
sequence_id | 増分イベントのレコード ID です。値は一意かつ増分的です。 |
operation_type | 操作タイプです。 |
execute_time | データに対応するタイムスタンプです。 |
before_image | 変更前のイメージかどうかを示します (Y/N)。 |
after_image | 変更後のイメージかどうかを示します (Y/N)。 |
src_datasource | ソースデータソースです。 |
src_database | ソースデータベースです。 |
src_table | ソーステーブルです。 |
ddl_sql | DDL 操作の場合、このフィールドには DDL 文が含まれます。 |
data_columns | 実際のデータフィールドが JSON オブジェクトにマージされます。 |
アイコンをクリックしてManually enterまたはBuilt-in Variableを選択し、名前の一部を連結することで送信先テーブル名を生成できます。変数には、ソースデータソース名、ソースデータベース名、ソーステーブル名が含まれます。





ボタンをクリックします。Target Table 列でフィールドを追加します。
ツールチップで確認できます。