すべてのプロダクト
Search
ドキュメントセンター

DataWorks:Data Integration でのリアルタイム同期タスクの設定

最終更新日:Dec 10, 2025

DataWorks の Data Integration は、異なるデータソース間で低遅延かつ高スループットのデータレプリケーションと転送を実現するために設計された、単一テーブルのリアルタイム同期タスクを提供します。この機能は、高度なリアルタイムコンピューティングエンジンを使用して、ソースでの挿入、削除、更新などのリアルタイムのデータ変更をキャプチャし、ターゲットに迅速に適用します。このトピックでは、Kafka から MaxCompute への単一テーブルの同期を例に、単一テーブルのリアルタイム同期タスクの設定方法を説明します。

事前準備

  • データソースの準備

    • ソースとターゲットのデータソースを作成します。データソースの設定に関する詳細については、「データソース管理」をご参照ください。

    • データソースがリアルタイム同期をサポートしていることを確認します。詳細については、「サポートされているデータソースと同期ソリューション」をご参照ください。

    • Hologres や Oracle などの一部のデータソースでは、ログ記録を有効にする必要があります。ログを有効にする方法はデータソースによって異なります。詳細については、「データソースリスト」をご参照ください。

  • リソースグループServerless リソースグループを購入し、設定します。

  • ネットワーク接続:リソースグループとデータソース間のネットワーク接続を確立します。

機能へのアクセス

DataWorks コンソールにログインします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、[Data Integration] > [Data Integration] を選択します。表示されたページで、ドロップダウンリストから目的のワークスペースを選択し、[Data Integration に移動] をクリックします。

タスクの設定

1. 同期タスクの作成

同期タスクは、次のいずれかの方法で作成できます:

  • 方法 1:同期タスクページで、[ソース][ターゲット] を選択し、[同期タスクの作成] をクリックします。この例では、Kafka がソースで、MaxCompute がターゲットです。必要に応じてソースとターゲットを選択できます。

  • 方法 2:同期タスクページで、タスクリストが空の場合は [作成] をクリックします。

image

2. 基本情報の設定

  1. タスク名、説明、オーナーなどの基本情報を設定します。

  2. 同期タイプを選択します。Data Integration は、ソースとターゲットのデータベースタイプに基づいて、サポートされている [同期タイプ] を表示します。このトピックでは、[単一テーブルのリアルタイム] を選択します。

  3. 同期ステップ:単一テーブルのリアルタイム同期タスクは、増分同期のみをサポートします。ステップは通常、[スキーマ移行][増分同期] です。このプロセスでは、まずソーステーブルのスキーマをターゲットに初期化します。タスクが開始されると、ソースからのデータ変更を自動的にキャプチャし、ターゲットテーブルに書き込みます。

    ソースが Hologres の場合、完全同期もサポートされます。このプロセスでは、まず既存のデータがターゲットテーブルに完全に同期されます。その後、増分データ同期が自動的に開始されます。
説明

サポートされているデータソースと同期ソリューションの詳細については、「サポートされているデータソースと同期ソリューション」をご参照ください。

3. ネットワークとリソースの設定

このステップでは、同期タスクの [リソースグループ] を選択します。また、[ソースデータソース][ターゲットデータソース] を選択します。その後、ネットワーク接続をテストします。

  • Serverless リソースグループの場合、同期タスクが使用できる計算ユニット (CU) の最大数を指定できます。同期タスクがメモリ不足 (OOM) エラーで失敗した場合は、リソースグループの CU 上限を増やしてください。

  • データソースを作成していない場合は、[データソースの追加] をクリックして作成します。詳細については、「データソースの設定」をご参照ください。

4. 同期チャネルの設定

1. ソースの設定

ページの上部で、Kafka データソースをクリックします。次に、[Kafka ソース情報] を編集します。

image

  1. [Kafka ソース情報] セクションで、Kafka データソースから同期するトピックを選択します。

    他の設定にはデフォルト値を使用するか、必要に応じて変更できます。パラメーターの詳細については、Kafka の公式ドキュメントをご参照ください。

  2. 右上隅にある [データサンプリング] をクリックします。

    表示されるダイアログボックスで、[開始時刻][サンプル数] を設定し、[サンプリング開始] をクリックします。この操作により、指定された Kafka トピックからデータがサンプリングされます。トピック内のデータをプレビューできます。このプレビューは、後続のデータ処理ノードのデータプレビューと可視化設定の入力として提供されます。

  3. [出力フィールドの設定] セクションで、必要に応じて同期するフィールドを選択します。

    デフォルトでは、Kafka は 6 つのフィールドを提供します。

    フィールド名

    説明

    __key__

    Kafka レコードのキー。

    __value__

    Kafka レコードの値。

    __partition__

    Kafka レコードが配置されているパーティション番号。パーティション番号は 0 から始まる整数です。

    __headers__

    Kafka レコードのヘッダー。

    __offset__

    パーティション内での Kafka レコードのオフセット。オフセットは 0 から始まる整数です。

    __timestamp__

    Kafka レコードの 13 桁の UNIX タイムスタンプ (ミリ秒単位)。

    後続のデータ処理ノードで、さらに多くのフィールド変換を実行することもできます。

2. データ処理ノードの編集

image アイコンをクリックして、データ処理メソッドを追加します。利用可能なメソッドは、データマスキング文字列置換データフィルタリングJSON 解析フィールドの編集と割り当て の 5 つです。これらのメソッドを目的の順序で配置できます。実行時、データ処理メソッドは指定された順序で実行されます。

image

データ処理ノードを設定した後、右上隅の [データ出力のプレビュー] をクリックできます:

  1. 入力データの下のテーブルで、前の [データサンプリング] ステップの結果を表示できます。[アップストリーム出力の再取得] をクリックして結果を更新します。

  2. アップストリームノードからの出力がない場合は、[手動でデータを構築] をクリックして前の出力をシミュレートすることもできます。

  3. [プレビュー] をクリックして、アップストリームステップからの出力データがデータ処理コンポーネントによって処理された後のデータを表示します。

image

説明

データ出力のプレビューとデータ処理機能は、Kafka ソースの [データサンプリング] に依存します。データを処理する前に、Kafka ソース設定でデータサンプリングを完了する必要があります。

3. ターゲットの設定

ページの上部で、MaxCompute データターゲットをクリックします。次に、MaxCompute ターゲット情報を編集します。

image

  1. [MaxCompute ターゲット情報] セクションで、トンネルリソースグループを選択します。デフォルトは「パブリック転送リソース」で、これは MaxCompute が提供する無料クォータです。

  2. ターゲットテーブルに [テーブルの自動作成] を行うか、[既存のテーブルを使用] するかを指定します。

    1. テーブルの自動作成を選択した場合、デフォルトでソーステーブルと同じ名前のテーブルが作成されます。ターゲットテーブル名は手動で変更できます。

    2. 既存のテーブルを使用することを選択した場合は、ドロップダウンリストからターゲットテーブルを選択します。

  3. (オプション) テーブルスキーマを編集します。

    [テーブルの自動作成] を選択した場合、[テーブルスキーマの編集] をクリックします。表示されるダイアログボックスで、ターゲットテーブルのスキーマを編集します。また、[アップストリームの出力列に基づいてテーブルスキーマを再生成] をクリックして、アップストリームノードの出力列に基づいてテーブルスキーマを自動的に生成することもできます。自動生成されたスキーマで列を選択し、それをプライマリキーとして設定できます。

  4. フィールドマッピングを設定します。

    1. システムは、[同じ名前のマッピング] の原則に基づいて、アップストリームの列をターゲットテーブルの列に自動的にマッピングします。必要に応じてマッピングを調整できます。1 つのアップストリーム列を複数のターゲット列にマッピングできますが、複数のアップストリーム列を 1 つのターゲット列にマッピングすることはできません。アップストリーム列がターゲット列にマッピングされていない場合、そのデータはターゲットテーブルに書き込まれません。

    2. Kafka フィールドについては、カスタムの JSON 解析を設定できます。データ処理コンポーネントを使用して value フィールドのコンテンツを取得します。これにより、より詳細なフィールド設定が可能になります。

      image

  5. (オプション) パーティションを設定します。

    1. [時間ベースの自動パーティション分割] は、ビジネス時間 (この場合は _timestamp フィールド) に基づいてパーティションを作成します。第 1 レベルのパーティションは年、第 2 レベルのパーティションは月、というようになります。

    2. [フィールドコンテンツによる動的パーティション分割] は、ソーステーブルのフィールドをターゲットの MaxCompute テーブルのパーティションフィールドにマッピングします。これにより、ソースフィールドに特定のデータを含む行が、MaxCompute テーブルの対応するパーティションに書き込まれるようになります。

5. その他の設定

アラート設定

タスクエラーによるデータ同期遅延を防ぐために、単一テーブルのリアルタイム同期タスクにアラートポリシーを設定できます。

  1. ページの右上隅にある [アラート設定] をクリックして、タスクのアラート設定ページを開きます。

  2. [アラートの追加] をクリックして、アラートルールを設定します。データ遅延、フェールオーバーイベント、タスクステータス、データ定義言語 (DDL) の変更、タスクリソース使用率などのメトリックを監視するアラートトリガーを設定できます。指定されたしきい値に基づいて、CRITICAL または WARNING のアラートレベルを設定できます。

  3. アラートルールを管理します。

    既存のアラートルールについては、アラートスイッチを使用して有効または無効にできます。また、アラートレベルに基づいて異なる担当者にアラートを送信することもできます。

高度なパラメーター設定

同期タスクには、詳細な設定を行うための高度なパラメーターが用意されています。システムはデフォルト値を提供しており、ほとんどの場合、変更する必要はありません。変更する場合:

  1. ページの右上隅にある [高度なパラメーター] をクリックして、高度なパラメーター設定ページを開きます。

  2. [ランタイム設定の自動構成] を false に設定します。

  3. ツールチップに基づいてパラメーター値を変更します。各パラメーターの説明は、その名前の横に表示されます。

重要

これらのパラメーターは、その目的と潜在的な影響を十分に理解した上で変更してください。誤った設定は、予期しないエラーやデータ品質の問題を引き起こす可能性があります。

リソースグループの設定

ページの右上隅にある [リソースグループの設定] をクリックして、現在のタスクで使用されているリソースグループを表示および切り替えることができます。

6. テスト実行

すべてのタスク設定が完了したら、右上隅の [テスト実行] をクリックしてタスクをデバッグします。これにより、タスク全体が少量のサンプルデータをどのように処理するかがシミュレートされます。その後、ターゲットテーブルに書き込まれる結果をプレビューできます。設定エラー、テスト実行中の例外、またはダーティデータがある場合、システムはリアルタイムでフィードバックを提供します。これにより、タスク設定の正しさと、期待される結果が得られるかどうかを迅速に評価できます。

  1. 表示されるダイアログボックスで、サンプリングパラメーター ([開始時刻][サンプル数]) を設定します。

  2. [サンプリング開始] をクリックして、サンプルデータを取得します。

  3. [プレビュー] をクリックして、タスクの実行をシミュレートし、出力を表示します。

テスト実行の出力はプレビュー専用です。ターゲットデータソースには書き込まれず、本番データに影響を与えません。

7. タスクの開始

  1. すべての設定が完了したら、ページ下部の [設定完了] をクリックします。

  2. [Data Integration] > [同期タスク] ページで、作成した同期タスクを見つけます。[操作] 列で、[公開] をクリックします。[公開後すぐに実行を開始] チェックボックスを選択すると、タスクは公開後すぐに実行されます。それ以外の場合は、タスクを手動で開始する必要があります。

    説明

    Data Integration タスクは、実行するために本番環境に公開する必要があります。したがって、新規または編集されたタスクは、[公開] 操作を実行した後にのみ有効になります。

  3. [タスクリスト] で、タスクの [名前/ID] をクリックして、詳細な実行プロセスを表示します。

次のステップ

タスクが開始された後、タスク名をクリックして実行の詳細を表示し、タスクの運用保守 (O&M) とチューニングを実行できます。

よくある質問

リアルタイム同期タスクに関するよくある質問への回答については、「リアルタイム同期に関するよくある質問」をご参照ください。

その他の例

Kafka から ApsaraDB for OceanBase への単一テーブルのリアルタイム同期

LogHub (SLS) から Data Lake Formation への単一テーブルのリアルタイム取り込み

Hologres から Doris への単一テーブルのリアルタイム同期

Hologres から Hologres への単一テーブルのリアルタイム同期

Kafka から Hologres への単一テーブルのリアルタイム同期

LogHub (SLS) から Hologres への単一テーブルのリアルタイム同期

Kafka から Hologres への単一テーブルのリアルタイム同期

Hologres から Kafka への単一テーブルのリアルタイム同期

LogHub (SLS) から MaxCompute への単一テーブルのリアルタイム同期

Kafka から OSS データレイクへの単一テーブルのリアルタイム同期

Kafka から StarRocks への単一テーブルのリアルタイム同期

Oracle から Tablestore への単一テーブルのリアルタイム同期