すべてのプロダクト
Search
ドキュメントセンター

DataWorks:単一テーブルのリアルタイム同期タスクの構成

最終更新日:Jun 22, 2026

DataWorks の Data Integration は、さまざまなデータソース間で低レイテンシー、高スループットのデータレプリケーションと転送を実現する単一テーブルのリアルタイム同期タスクを提供します。この機能は、高度なリアルタイムコンピューティングエンジンを使用して、ソースでのデータ変更 (挿入、削除、更新) をキャプチャし、送信先に適用します。このトピックでは、Kafka から MaxCompute への単一テーブルのリアルタイム同期を例に、タスクの構成方法を説明します。

事前準備

  • データソースの準備

    • ソースと送信先のデータソースを作成します。データソースの構成の詳細については、「データソース管理」をご参照ください。

    • データソースがリアルタイム同期をサポートしていることを確認してください。詳細については、「サポートされているデータソースと同期ソリューション」をご参照ください。

    • Hologres や Oracle などの一部のデータソースでは、ロギングを有効にする必要があります。ログを有効にする方法はデータソースによって異なります。詳細については、「データソース一覧」をご参照ください。

  • リソースグループサーバーレスリソースグループを購入して構成します。

  • ネットワーク接続:リソースグループとデータソース間のネットワーク接続を確立します。

ステップ1:同期タスクの作成

  1. DataWorks コンソールにログインします。 左側のナビゲーションウィンドウで、[Data Integration] をクリックします。 表示されるページで、ドロップダウンリストから目的のワークスペースを選択し、[Data Integrationに移動] をクリックします。

  2. 左側メニューで Synchronization Task をクリックします。ページ上部で Create Synchronization Task をクリックし、タスク情報を構成します。この例では、Kafka から MaxCompute へのリアルタイム同期を使用します。

    • Source TypeKafka

    • Destination TypeMaxCompute

    • Specific Type単一テーブルリアルタイム

    • Synchronization Mode

      • Schema Migration:ソースに一致するテーブル、フィールド、データ型などのデータベースオブジェクトを送信先に自動的に作成します。このステップにはデータは含まれません。

      • Incremental Sync (オプション):完全同期が完了した後、このステップではソースからのデータ変更 (挿入、更新、削除) を継続的にキャプチャし、これらの変更を送信先に同期します。

        ソースが Hologres の場合、完全同期もサポートされます。このプロセスでは、まず既存のすべてのデータを送信先テーブルに同期します。その後、増分データ同期が自動的に開始されます。
説明

サポートされているデータソースと同期ソリューションの詳細については、「サポートされているデータソースと同期ソリューション」をご参照ください。

ステップ2:データソースと実行リソースの構成

  1. [Source Information] で、Kafka データソースを選択します。[Destination] で、MaxCompute データソースを選択します。

  2. Running Resources セクションで、同期タスクの Resource Group を選択し、Resource GroupCU をタスクに割り当てます。フル同期と増分同期に対して CU を個別に設定することで、リソースを正確に制御し、浪費を防ぐことができます。同期タスクがメモリ不足 (OOM) エラーで失敗した場合は、リソースグループの CU 値を増やします。

  3. ソースと送信先の両方のデータソースが Connectivity Check に合格することを確認してください。

ステップ3:同期ソリューションの構成

1. ソースの構成

  1. [Configuration] タブで、同期する Kafka トピックを選択します。

    他の構成にはデフォルト値を使用するか、必要に応じて変更できます。パラメーターの詳細については、Kafka の公式ドキュメント をご参照ください。

  2. 右上隅にある Data Sampling をクリックします。

    表示されるダイアログボックスで、Start timeSampled Data Records を設定し、Start Collection をクリックします。この操作により、指定された Kafka トピックからデータがサンプリングされます。データをプレビューできます。このデータは、後続のデータ処理ノードにおけるプレビューおよび可視化構成の入力として使用されます。

  3. [Configure Output Field] タブで、同期するフィールドを選択します。

    デフォルトでは、Kafka は 6 つのフィールドを提供します。

    フィールド名

    説明

    __key__

    Kafka レコードのキー。

    __value__

    Kafka レコードの値。

    __partition__

    Kafka レコードが格納されているパーティション番号。パーティション番号は 0 から始まる整数です。

    __headers__

    Kafka レコードのヘッダー。

    __offset__

    パーティション内の Kafka レコードのオフセット。オフセットは 0 から始まる整数です。

    __timestamp__

    Kafka レコードのミリ秒単位の 13 桁の UNIX タイムスタンプ。

    後続のデータ処理ノードで、追加のフィールド変換を実行することもできます。

2. データ処理

Data Processing トグルを有効にします。[データマスキング]、[文字列置換]、[データフィルタリング]、[JSON 解析]、[フィールドの編集と割り当て]の 5 つのデータ処理方法が利用できます。これらのメソッドは任意の順序で配置でき、実行時には設定した順序で実行されます。

データ処理ノードを構成した後、右上隅にある Preview Data Output をクリックします。

  1. 入力データの下のテーブルには、前の Data Sampling ステップの結果が表示されます。Re-obtain Output of Ancestor Node をクリックして、結果を更新できます。

  2. 上流ノードからの出力がない場合は、Manually Construct Data を使用して、前の出力をシミュレートすることもできます。

  3. Preview をクリックして、データ処理コンポーネントによって処理された後の上流ステップからの出力データを表示します。

[入力データ] テーブルには、キーパーティションオフセットタイムスタンプなどの Kafka メッセージフィールドが表示されます。[プレビュー結果] セクションには、JSON 解析からのキーと値のペアなど、データ処理後に解析されたフィールドが表示されます。また、見つかったダーティデータのレコード数と免責事項「ここに表示されるプレビューは参考用です。実際のタスク実行の結果が正となります。」も含まれます。

説明

データ出力のプレビュー機能とデータ処理機能は、Kafka ソースでの Data Sampling に依存します。データを処理する前に、まず Kafka ソースのデータサンプリングを実行する必要があります。

3. 送信先の構成

  1. [Destination] セクションで、Tunnel リソースグループを選択します。デフォルトでは「パブリック転送リソース」が選択されており、これは MaxCompute の無料クォータに対応します。

  2. [新しいテーブル] に書き込むか、[既存のテーブルを使用] するかを指定します。

    1. 新しいテーブルを作成する場合は、ドロップダウンリストから Create を選択できます。デフォルトでは、ソースと同じ構造のテーブルが自動的に作成されます。送信先テーブル名とスキーマは手動で変更できます。

    2. 既存のテーブルを使用することを選択した場合は、ドロップダウンリストから送信先テーブルを選択します。

  3. (オプション) テーブルスキーマの編集。

    テーブル名の横にある編集アイコンをクリックして、そのスキーマを変更します。Re-generate Table Schema Based on Output Column of Ancestor Node をクリックすると、上流ノードの出力列に基づいてスキーマが自動的に生成されます。その後、自動生成されたスキーマの列を選択して主キーとして設定できます。

4. フィールドマッピングの構成

ソースと送信先を選択した後、ソース列と送信先列のマッピングを指定する必要があります。タスクは、構成されたフィールドマッピングに基づいて、ソースフィールドから対応する送信先フィールドにデータを書き込みます。

  1. システムは、The same name mapping の原則に基づいて、上流の列を送信先の列に自動的にマッピングします。必要に応じてマッピングを調整できます。1 つの上流の列を複数の送信先の列にマッピングできますが、複数の上流の列を 1 つの送信先の列にマッピングすることはできません。上流の列がマッピングされていない場合、そのデータは送信先テーブルに書き込まれません。

  2. Kafka フィールドの場合、カスタムの JSON 解析を構成できます。データ処理コンポーネントを使用して __value__ フィールドのコンテンツを取得し、より詳細なフィールド構成を行います。

    フィールドマッピングセクションで、[同名でマッピング] をクリックして、上流の入力フィールドと MaxCompute の出力フィールド間に一対一の対応を作成します。マッピングには、Kafka メタデータフィールド (__key____value____partition____offset____timestamp____headers__) とビジネスフィールド (idnameage) が含まれます。LONG 型のフィールドは MaxCompute の BIGINT 型にマッピングされ、STRING 型は変更されません。

  3. (オプション) パーティションの構成。

    1. [時間ベースの自動パーティショニング] は、ビジネス時間 (この場合は __timestamp__ フィールド) に基づいてパーティションを作成します。第 1 レベルのパーティションは年単位、第 2 レベルのパーティションは月単位などとなります。

    2. [フィールドコンテンツによる動的パーティショニング] は、ソーステーブルのフィールドを送信先の MaxCompute テーブルのパーティションフィールドにマッピングします。これにより、ソースフィールドに特定のデータを含む行が、MaxCompute テーブルの対応するパーティションに書き込まれるようになります。

ステップ4:詳細設定

同期タスクは、詳細な構成のための高度なパラメーターを提供します。システムはデフォルト値を提供するため、ほとんどの場合、変更する必要はありません。変更するには、次の手順に従います。

  1. ページの右上隅にある Advanced Settings をクリックして、Advanced Parameters 構成ページに移動します。

    説明

    Data Development では、詳細設定はタスク構成ページの右側にあるタブにあります。

  2. 同期タスクの読み取り側と書き込み側で個別にパラメーターを設定できます。[実行時構成を自動的に設定]false に設定して、[実行時構成] をカスタマイズします。

  3. ツールチップに基づいてパラメーター値を変更します。各パラメーターの説明は、その名前の横に表示されます。一部のパラメーターの構成の推奨事項については、「リアルタイム同期の高度なパラメーター」をご参照ください。

重要

これらのパラメーターは、その目的と起こりうる結果を完全に理解した後にのみ変更してください。誤った設定は、予期しないエラーやデータ品質の問題を引き起こす可能性があります。

ステップ5:テスト実行

すべてのタスク構成が完了したら、左下隅にある Perform Simulated Running をクリックしてタスクをデバッグします。これにより、タスクが少量のサンプルデータをどのように処理するかがシミュレートされ、送信先テーブルで結果をプレビューできます。構成エラー、例外、またはダーティデータがある場合、システムはリアルタイムのフィードバックを提供します。これにより、タスク構成の正しさを迅速に評価し、期待どおりの結果が得られるかどうかを確認できます。

  1. 表示されるダイアログボックスで、Start timeSampled Data Records のサンプリングパラメーターを設定します。

  2. Start Collection をクリックしてサンプルデータを取得します。

  3. Preview Result をクリックしてタスクの実行をシミュレートし、出力を表示します。

テスト実行の出力はプレビュー専用です。送信先のデータソースには書き込まれず、本番データには影響しません。

ステップ6:公開と実行

  1. すべての構成が完了したら、ページの下部にある Save をクリックしてタスク構成を保存します。

  2. データ統合タスクを実行するには、本番環境に公開する必要があります。新規作成または編集したタスクを有効にするには、Deploy する必要があります。公開時に Start immediately after deployment を選択すると、タスクは自動的に開始されます。それ以外の場合は、タスクが公開された後、Data Integration > Synchronization Task ページに移動し、[操作] 列からタスクを手動で開始します。

  3. [Tasks] で、タスクの Name/ID をクリックして、その詳細な実行プロセスを表示します。

ステップ7:アラートルールの構成

タスクが公開されて実行された後、アラートルールを構成して、例外に関する通知をすぐに受け取ることができます。これにより、本番環境を安定させ、データの鮮度を保つことができます。Data Integration のタスクリストで対象のタスクを見つけ、[操作] 列で [More] > [アラーム設定] をクリックします。

1. アラートの追加

[カスタムルールの使用] を選択し、[アラート名][説明] を入力します。サポートされている通知方法には、[メール][SMS][電話][DingTalk][Webhook][Feishu] があります。これらの方法は、WARNING レベルと CRITICAL レベルで個別に構成できます。受信者は、[オンコールスケジュール] を通じて指定するか、手動で追加できます。

(1) Create Rule をクリックしてアラートルールを構成します。

[Alert Reason] を設定して、Business delay[フェールオーバー]Task statusDDL NotificationTask Resource Utilization などのメトリックを監視できます。その後、指定されたしきい値に基づいて CRITICAL または WARNING のアラートレベルを設定できます。

  • アラート方法を設定した後、Configure Advanced Parameters を使用してアラート通知の送信間隔を制御し、大量のメッセージ送信や無駄を防ぐことができます。

  • アラートの原因として Business delayTask status、または Task Resource Utilization を選択した場合、タスクが正常な状態に復旧したときに受信者に通知する復旧通知を有効にすることもできます。

(2) アラートルールの管理。

既存のアラートルールについては、トグルを使用して有効または無効にできます。アラートレベルに基づいて、異なる担当者にアラートを送信することもできます。

2. アラートの表示

タスクリストで、More > Configure Alert Rule をクリックしてアラートイベントページに移動し、アラート履歴を表示します。

関連操作

タスクが開始された後、タスク名をクリックして実行の詳細を表示し、タスクの運用保守 (O&M) とチューニングを実行できます。

よくある質問

リアルタイム同期タスクに関するよくある質問への回答については、「リアルタイム同期に関するFAQ」をご参照ください。

関連ドキュメント

Kafka から ApsaraDB for OceanBase への単一テーブルのリアルタイム同期

LogHub (SLS) から Data Lake Formation への単一テーブルのリアルタイム取り込み

Hologres から Doris への単一テーブルのリアルタイム同期

Hologres から Hologres への単一テーブルのリアルタイム同期

Kafka から Hologres への単一テーブルのリアルタイム同期

LogHub (SLS) から Hologres への単一テーブルのリアルタイム同期

Kafka から Hologres への単一テーブルのリアルタイム同期

Hologres から Kafka への単一テーブルのリアルタイム同期

LogHub (SLS) から MaxCompute への単一テーブルのリアルタイム同期

Kafka から OSS データレイクへの単一テーブルのリアルタイム同期

Kafka から StarRocks への単一テーブルのリアルタイム同期

Oracle から Tablestore への単一テーブルのリアルタイム同期