すべてのプロダクト
Search
ドキュメントセンター

DataWorks:増分データのバッチ同期ノードの設定

最終更新日:Jun 18, 2026

バッチ同期ノードでフィルター条件を使用することで、全量データまたは増分データのいずれかを同期できます。フィルター条件を使用すると、Data Integration は指定された条件を満たすデータのみを同期します。また、フィルター条件とスケジューリングパラメーターを組み合わせることで、ノードのランタイムに基づいてデータを動的にフィルタリングし、増分同期を実現できます。このトピックでは、増分同期のためのバッチ同期ノードの設定方法について説明します。

使用上の注意

  • HBase や OTSStream など、一部のデータソースでは増分同期がサポートされていません。特定のデータソースで増分同期がサポートされているかどうかを確認するには、対応する Reader プラグインのドキュメントをご参照ください。

  • 増分同期に必要なパラメーターは、Reader プラグインによって異なります。詳細については、特定のプラグインのドキュメントおよびサポートされているデータソースとプラグインをご参照ください。例:

    Reader プラグイン

    必須パラメーター

    サポートされている構文

    MySQL Reader

    where

    説明

    コードレス UI では、これはフィルター条件パラメーターです。

    データベース構文

    説明

    このパラメーターをスケジューリングパラメーターと組み合わせて使用することで、毎日指定された時間範囲のデータを読み取ることができます。

    MongoDB Reader

    query

    説明

    コードレス UI では、これは検索条件パラメーターです。

    データベース構文に類似

    説明

    このパラメーターをスケジューリングパラメーターと組み合わせて使用することで、毎日指定された時間範囲のデータを読み取ることができます。

    OSS Reader

    Object

    オブジェクトパスを指定

    説明

    このパラメーターをスケジューリングパラメーターと組み合わせて使用することで、毎日指定されたファイルからデータを読み取ることができます。

    ...

    ...

    ...

増分同期の設定

Data Integration バッチ同期ノードでは、スケジューリングパラメーターを使用して、ソースと宛先のテーブルのデータパスと範囲を指定できます。設定方法は他のノードタイプと同じです。

ランタイム時、システムはプレースホルダーパラメーターをスケジューリングパラメーター式の実際の値に置き換えます。

例えば、MySQL データソースからデータを同期する場合:

  • Data Filtering を設定しない場合、ノードはデフォルトで全量データを宛先テーブルに同期します。

  • Data Filtering を設定した場合、ノードは条件を満たすデータのみを宛先テーブルに同期します。

宛先の MaxCompute テーブルのパーティション名は、スケジューリングパラメーターを使用して指定します。$bizdate パラメーターはデータタイムスタンプを表します。スケジュールされたタスクが実行されると、タスク設定内のパーティション式はデータタイムスタンプに置き換えられます。スケジューリングパラメーター式の設定方法の詳細については、「スケジューリングパラメーターの設定と使用」をご参照ください。バッチ同期ノードで増分同期を実装するには、3 つの場所で bizdate パラメーターを設定する必要があります。ソースの [データフィルタリング] フィールドに、 STR_TO_DATE('${bizdate}','%Y%m%d') <= gmt_modify_time AND gmt_modify_time < DATE_ADD(STR_TO_DATE('${bizdate}','%Y%m%d'), interval 1 day) と入力し、データタイムスタンプに基づいて当日に変更されたデータをフィルタリングします。宛先の [パーティション情報] フィールドに、 pt=${bizdate} と入力して対応する日付パーティションにデータを書き込み、[クリーンアップルール][Insert Overwrite] に設定します。右側の [スケジューリング] ペインの [パラメーター] フィールドに、 bizdate=$bizdate と入力します。これにより、スケジューリングシステムは実行時に ${bizdate} を実際のデータタイムスタンプに自動的に置き換えます。増分データ同期を設定する場合:

  • 時刻または日付データ型の増分フィールドの場合、スケジューリングパラメーターを使用して値を動的に置換できます。ランタイム時、システムはデータタイムスタンプに基づいてパラメーターを特定の値に自動的に置き換えます。詳細については、「スケジューリングパラメーターの形式」をご参照ください。

  • 時間ベースでない増分フィールドの場合、代入ノードを使用してフィールドをターゲットデータ型に変換し、Data Integration に渡してデータ同期を行うことができます。詳細については、「代入ノード」をご参照ください。

注意事項

増分同期タスクを設定する際は、以下の点にご注意ください:

  • Insert Overwrite の安全性[Insert Overwrite] モードの使用は安全です。このモードは、現在のタスクで指定されたパーティションのみをクリアします。他のパーティションのデータには影響せず、データの競合や誤削除が発生しません。

  • パーティション範囲の一括上書きに関する制約: DataWorks では、一括上書き操作のパーティション設定で、hh=00-23 のような時間範囲の値を設定することはサポートされていません。パーティションパラメーターは単一の値またはワイルドカード文字 * のみをサポートするため、複数時間にわたるデータを上書きするには、個別のタスクを設定して実行する必要があります。

  • ワイルドカード構文形式: ソースには時間単位のパーティションがあるものの、宛先は日単位でしかパーティション化されていないシナリオでは、すべての時間単位のデータと一致させるために、時間パーティションフィールドにワイルドカード文字 * を入力する必要があります。 * を直接入力してください。 "*" のように引用符を追加すると構文エラーが発生するため、追加しないでください。

高頻度、タイムスタンプベースの増分同期

DataWorks は、5 分ごとや 1 時間ごとなどの定期スケジューリングと組み合わせたバッチ同期ノードを通じて、タイムスタンプベースの増分同期をサポートしています。この方法は、ApsaraDB RDS for MySQL などのソースから SelectDB や StarRocks などの宛先への T+1 または準リアルタイム同期シナリオに適しています。このアプローチは、増分ロードに SQL フィルタリングを使用し、継続的なリアルタイム CDC タスクの実行コストを回避します。主な設定ポイント:

  1. ソースの Data Filtering セクションの where 句で、フィルタリング用の変数としてタイムスタンプフィールドを使用します。 例: gmt_modify_time >= '$[yyyymmddhhmiss-10/mi]' AND gmt_modify_time < '$[yyyymmddhhmiss]'

  2. 時間範囲を動的に計算するには、$[yyyymmddhhmiss] などの定期スケジューリングパラメーターを設定します。 これにより、スケジュールされた各実行で、指定された間隔内の増分データのみが同期されるようになります。 スケジューリングパラメーターの設定の詳細については、「データ統合におけるスケジューリングパラメーターの使用例」をご参照ください。

  3. 宛先フィールドマッピングで、パーティションフィールドにマッピングされた定数パラメーターを手動で追加して、動的なパーティション書き込みを有効にします。

データベース全体のバッチ同期の増分設定

単一テーブルの同期タスクに加えて、データベース全体のバッチ同期タスクを作成して 定期的な増分同期 を実現できます。 タスクを作成する際に、増分同期を選択し、タスク設定で 増分条件 を設定します。 これにより、たとえば、 create_time  フィールドに基づいてフィルタリングすることで、効率的な毎日の増分パーティション同期が可能になります。

この方法は、複数のテーブルの同期を一元管理しながら、特定のテーブルのみで増分更新を実行したいシナリオに適しています。データベース全体のバッチ同期タスクの完全な設定プロセスについては、「データベース全体のバッチ同期タスクの設定」をご参照ください。

ユースケース

  • 履歴データの同期: 履歴増分データを宛先テーブルの対応する時間ベースのパーティションに同期するには、オペレーションセンターのデータバックフィル機能を使用できます。 この機能の使用方法の詳細については、「データバックフィルインスタンスの管理」をご参照ください。 ノード設定で、データソースとして MySQL を、宛先として MaxCompute を選択し、テーブル名として czd などを指定します。 データフィルタリング条件で、${bizdate} を使用して増分範囲を制御します。たとえば、STR_TO_DATE('${bizdate}','%Y%m%d') <= gmt_modify_time のように指定します。 パーティション情報を ds=${bizdate} に設定し、クリーンアップルールとして[Insert Overwrite]を選択します。 スケジュール設定のパラメーターセクションで、bizdate=$bizdate を定義します。 データバックフィルシナリオでは、スケジューリングシステムがこのパラメーターをデータタイムスタンプの対応する日付に自動的に置き換えます。 データバックフィルを実行するときに、複数のデータタイムスタンプ範囲 (たとえば、2022-05-01 から 2022-05-31 まで、および 2022-04-01 から 2022-04-30 まで) を設定できます。 [現在時刻より後のスケジュール時刻でバックフィルインスタンスをすぐに実行] を選択し、実行順序として [データタイムスタンプの昇順] を選択します。

  • ApsaraDB RDS から MaxCompute への増分データ同期

よくある質問

MaxCompute Reader: 「partition does not exist」エラー

原因: 設定されたスケジューリングパラメーターが実行時に実際のパーティション値に正しく解析されなかったか、解析された値がソーステーブルの実際のパーティションと一致しません。

解決策: パーティション値が上流ノードの outputs パラメーターから渡される場合、DataStudio のパラメーター設定を確認し、以下の点を確認してください。

  • 設定されたパラメーター名が入力パラメーター名と一致していることを確認してください。

  • 渡されたパラメーター値が MaxCompute の既存のパーティションと正確に一致していることを確認してください。

デフォルトの同期動作と非パーティションテーブル

デフォルトの動作

デフォルトでは、DataWorks Data Integration バッチ同期タスクは全量同期を実行します。増分同期を実現するには、Data Filtering 条件を設定し、スケジューリングパラメーターと組み合わせる必要があります。

パーティションフィールドを持たないソーステーブルの処理

ApsaraDB RDS のテーブルなどのソースデータベーステーブルに、時刻フィールドまたはパーティションフィールドがない場合、WHERE 句を使用して増分データを直接フィルターすることはできません。ソーステーブルに dtgmt_modify_time などの時刻フィールドを追加し、それを増分フィルタリングの基準として使用することをお勧めします。フィールドを準備した後、本ドキュメントの「増分同期の設定」セクションを参照して、増分同期ロジックを設定してください。

splitPk と splitFactor: 目的と影響

パラメーターの定義

  • splitPk (分割プライマリキー): プライマリキー列を指定します。 DataWorks は、このフィールドの値に基づいてデータを複数の範囲に分割し、マルチスレッドの並列読み取りを可能にします。

  • splitFactor: 分割の粒度を制御します。値が大きいほど、分割が細かくなり、読み取りスレッドが多くなります。

パフォーマンスへの影響と推奨事項

splitPksplitFactor を有効にすると、ソースデータベースの負荷が増加します。この負荷を軽減するには、次のことをお勧めします。

  • 同時実行数を減らすか、データベース全体のバッチタスクの同時実行数を 1 に設定してください。

  • 読み取り効率を向上させるために、分割キー (splitPk) にインデックスを作成してください。