すべてのプロダクト
Search
ドキュメントセンター

DataWorks:単一テーブルリアルタイム同期タスクの設定

最終更新日:Feb 12, 2026

DataWorks Data Integration では、異なるデータソース間で低遅延・高スループットなデータレプリケーションおよび転送を実現するための単一テーブルリアルタイム同期タスクを提供しています。この機能は、高度なリアルタイムコンピュートエンジンを活用して、ソースからのリアルタイムデータ変更(挿入、更新、削除)を検出し、迅速に宛先へ適用します。本トピックでは、Kafka から MaxCompute へのデータ同期を例として、単一テーブルリアルタイム同期タスクの設定手順を説明します。

前提条件

  • データソースの準備

    • ソースおよび宛先のデータソースを作成済みである必要があります。詳細については、「データソース管理」をご参照ください。

    • データソースがリアルタイム同期をサポートしていることを確認してください。対応データソースおよび同期ソリューションの詳細については、「対応データソースおよび同期ソリューション」をご参照ください。

    • 一部のデータソースではログの有効化が必要です。有効化方法はデータソースによって異なります。各データソースの構成ガイドについては、「データソース一覧」をご参照ください。

  • リソースグループServerless リソースグループを購入し、設定済みです。

  • ネットワーク接続:リソースグループとデータソース間のネットワーク接続を確立済みである必要があります。

ステップ 1:同期タスクの作成

  1. DataWorks コンソールにログインします。上部ナビゲーションバーで目的のリージョンを選択します。左側ナビゲーションウィンドウで Data Integration > Data Integration を選択します。表示されたページでドロップダウンリストから目的のワークスペースを選択し、Data Integration へ移動 をクリックします。

  2. 左側ナビゲーションウィンドウで 同期タスク をクリックします。表示されたページで 同期タスクの作成 をクリックし、タスクを構成します。本トピックでは、Kafka から MaxCompute へのリアルタイム同期を例として説明します:

    • ソースKafka

    • 宛先MaxCompute

    • [同期タイプ]: 単一テーブル リアルタイム

    • 同期ステップ

      • スキーマ移行:ソースと一致するデータベースオブジェクト(テーブル、フィールド、データの型など)を宛先に自動的に作成します。このステップではデータは含まれません。

      • 増分同期(任意):完全同期が完了した後、ソースからのデータ変更(挿入、更新、削除)を継続的に取得し、宛先へ同期します。

        ソースが Hologres の場合、完全同期もサポートされています。タスクはまず既存の全データを宛先テーブルへ同期し、その後自動的に増分同期を開始します。
説明

対応データソースおよび同期ソリューションの詳細については、「対応データソースおよび同期ソリューション」をご参照ください。

ステップ 2:データソースおよび実行時リソースの構成

  1. ソースデータソースで、ご利用の Kafka データソースを選択します。宛先データソースで、ご利用の MaxCompute データソースを選択します。

  2. 実行時リソースセクションで、同期タスク用のリソースグループを選択し、タスクにCUを割り当てます。完全同期および増分同期それぞれに CU を個別に設定することで、リソースを精密に制御し、無駄を防止できます。タスク実行時にメモリ不足(OOM)エラーが発生した場合は、タスクの CU 割り当てを増加させてください。

  3. ソースおよび宛先の両方のデータソースがネットワーク接続チェックを通過することを確認してください。

ステップ 3:同期ソリューションの構成

1.ソースの構成

  1. 構成タブで、同期対象の Kafka Topic を選択します。

    デフォルト設定を使用するか、必要に応じて変更してください。パラメーターの詳細については、公式 Kafka ドキュメントをご参照ください。

  2. 右上隅にある Data Sampling をクリックします。

    表示されるダイアログボックスで、Start timeおよびSampled Data Recordsを指定し、Start Collection をクリックします。これにより、指定された Kafka Topic からデータがサンプリングされます。Topic 内のデータをプレビューでき、後続のデータ処理ノードにおけるデータプレビューおよび可視化構成の入力情報として利用できます。

  3. Configure Output Fieldタブで、同期対象のフィールドを選択します。

    Kafka には 6 つのデフォルトフィールドが提供されます。

    フィールド名

    説明

    __key__

    Kafka Record のキー。

    __value__

    Kafka Record の値。

    __partition__

    Kafka Record が格納されているパーティション番号。パーティション番号は 0 から始まる整数です。

    __headers__

    Kafka Record のヘッダー。

    __offset__

    Kafka Record のパーティション内でのオフセット。オフセットは 0 から始まる整数です。

    __timestamp__

    Kafka Record の 13 桁の整数ミリ秒タイムスタンプ。

    さらに詳細なフィールド変換は、後続のデータ処理ステップで実行できます。

2.データ処理

データ処理を有効化します。利用可能な処理方法は以下の 5 種類です:データマスキング文字列置換データフィルタリングJSON 解析、およびフィールドの編集および割り当て。これらの処理方法を希望する実行順序で並べ替えてください。

image

データ処理ステップを構成した後、右上隅の Preview Data Output をクリックできます:

  1. 入力データ下のテーブルには、前のData Samplingステップの結果が表示されます。Re-obtain Output of Ancestor Node をクリックすると、結果を更新できます。

  2. 上流ステップから出力がない場合は、Manually Construct Data を使用して、上流の出力をシミュレートできます。

  3. Preview をクリックすると、データ処理コンポーネントによって処理された後の上流ステップの出力データを確認できます。

image

説明

データ出力プレビューおよびデータ処理機能は、Kafka ソースからのData Sampling結果に依存します。データ処理の構成を行う前に、まず Kafka ソースの構成ページでデータサンプリングを実行してください。

3.宛先の構成

  1. 宛先セクションで、Tunnel リソースグループを選択します。デフォルトではパブリック転送リソースが選択されており、これは MaxCompute の無料クォータを利用します。

  2. 新規テーブルまたは既存テーブルへのデータ書き込みを選択します。

    1. 新規テーブルを作成する場合は、ドロップダウンリストから作成を選択します。デフォルトでは、データソースと同じスキーマを持つテーブルが作成されます。宛先テーブル名およびスキーマは手動で変更可能です。

    2. 既存テーブルを使用する場合は、ドロップダウンリストから対象テーブルを選択します。

  3. (任意)テーブルスキーマの編集

    テーブル名の横にある編集アイコンをクリックして、テーブルスキーマを編集します。Re-generate Table Schema Based on Output Column of Ancestor Node をクリックすると、上流ノードの出力カラムに基づいてテーブルスキーマを自動生成できます。自動生成されたスキーマ内のカラムをプライマリキーとして選択できます。

4.フィールドマッピングの構成

ソースおよび宛先を選択した後、ソースフィールドと宛先カラム間のマッピングを指定する必要があります。タスクは、構成されたフィールドマッピングに基づいて、ソースフィールドから対応する宛先カラムへデータを書き込みます。

  1. システムは、同名マッピングの原則に基づき、上流フィールドと宛先テーブルカラム間のマッピングを自動生成します。必要に応じてマッピングを調整できます。1 つの上流フィールドを複数の宛先テーブルカラムにマッピングできますが、複数の上流フィールドを 1 つの宛先テーブルカラムにマッピングすることはできません。上流フィールドが宛先テーブルカラムにマッピングされない場合、そのデータは宛先テーブルへ書き込まれません。

  2. Kafka フィールドに対してカスタムJSON 解析を構成できます。データ処理コンポーネントを使用して、value フィールドの内容を取得し、より詳細なフィールド構成を行います。

    image

  3. (任意)パーティションの構成

    1. 自動時間ベースパーティショニングは、ビジネス時間フィールド(本例では _timestamp)に基づいてデータをパーティション分割します。1 次パーティションは年、2 次は月、といった具合です。

    2. フィールド内容による動的パーティショニングは、ソーステーブルのフィールドを宛先 MaxCompute テーブルのパーティションフィールドにマッピングします。これにより、特定のフィールド値を含む行が、MaxCompute テーブルの対応するパーティションへ書き込まれます。

ステップ 4:高度な設定の構成

同期タスクでは、細かい制御のための高度なパラメーターが提供されます。ほとんどの場合、デフォルト値の変更は不要です。必要に応じて、以下の操作を行ってください。

  1. 右上隅の 高度なパラメーター をクリックして、高度なパラメーター 構成ページへ移動します。

    説明

    高度なパラメーターは、タスク構成ページ右側のタブにあります。

  2. 同期タスクのリーダーおよびライターそれぞれに個別にパラメーターを設定できます。実行時構成をカスタマイズするには、実行時設定の自動構成を無効化します。

  3. ツールチップに記載された説明に従ってパラメーター値を変更してください。各パラメーターの説明は、パラメーター名の隣に記載されています。一部のパラメーターについての構成推奨事項は、「リアルタイム同期の高度なパラメーター」をご参照ください。

重要

これらのパラメーターは、その目的および潜在的な影響を十分に理解した上でのみ変更してください。誤った設定は、予期しないエラーまたはデータ品質の問題を引き起こす可能性があります。

ステップ 5:テスト実行

タスクの構成が完了したら、左下隅の Perform Simulated Running をクリックしてタスクをデバッグできます。テスト実行では、少量のサンプルデータを用いてタスク全体のフローをシミュレートし、宛先テーブルへ書き込まれる結果をプレビューできます。テスト実行中に構成エラー、例外、または不正データ(Dirty Data)が発生した場合、システムはリアルタイムでフィードバックを提供します。これにより、タスクの構成が正しく行われており、期待通りの結果が得られることを迅速に検証できます。

  1. 表示されるダイアログボックスで、サンプリングパラメーターを設定します:Start timeおよびSampled Data Records

  2. Start Collection をクリックしてサンプルデータを取得します。

  3. Preview Result をクリックしてタスクの実行をシミュレートし、出力結果を確認します。

テスト実行の出力はプレビュー専用であり、宛先データソースへは書き込まれません。本番データには一切影響しません。

ステップ 6:タスクの公開および実行

  1. すべての構成が完了したら、ページ下部の 構成完了 をクリックします。

  2. Data Integration タスクは、本番環境へ公開して初めて実行されます。タスクの作成または編集後に、変更を適用するには Deploy をクリックします。公開時に、タスクを即座に開始するオプションを選択できます。このオプションを選択しなかった場合、公開後にData Integration > Synchronization Taskページへ移動し、手動でタスクを開始する必要があります。

  3. Tasks一覧で、タスクのName/IDをクリックして、詳細な実行プロセスを確認します。

ステップ 7:アラートルールの構成

タスクが公開され実行中になった後、アラートルールを構成することで、例外発生時に即座に通知を受け取ることができます。これにより、本番環境の安定性およびデータの最新性を維持できます。同期タスクページで、対象タスクのアクション列からその他 > アラート設定 をクリックします。

1.アラートの追加

image

(1)Create Rule をクリックしてアラートルールを構成します。

Alert Reasonとして、Business delayフェールオーバーTask statusDDL Notification、およびTask Resource Utilizationなどのメトリックを監視できます。指定されたしきい値に基づき、CRITICAL または WARNING のアラートレベルを設定できます。

  • アラート方法を設定した後、Configure Advanced Parameters を使用して、アラートメッセージの送信間隔を制御できます。これにより、一度に多数のメッセージが送信されるのを防ぎ、無駄やメッセージの滞留を回避できます。

  • Business delayTask status、またはTask Resource Utilizationをアラートトリガーとして選択した場合、復旧通知を有効化して、タスクが正常状態に戻った際に受信者へ通知できます。

(2)アラートルールの管理

既存のアラートルールについては、アラートスイッチを使用して有効化または無効化できます。また、アラートレベルに応じて異なる連絡先へ通知できます。

2.アラートの確認

タスクのMore > Configure Alert Rule をクリックして、アラートイベントの履歴を確認できます。

次のステップ

タスクが開始された後、タスク名をクリックして実行詳細を確認し、タスクの運用保守(O&M)およびチューニングを実行できます。

よくある質問

リアルタイム同期タスクに関する一般的な課題については、「リアルタイム同期 FAQ」をご参照ください。

その他の例

Kafka から ApsaraDB for OceanBase へのリアルタイム単一テーブル同期

LogHub(SLS)から Data Lake Formation へのリアルタイム単一テーブル取り込み

Hologres から Doris へのリアルタイム単一テーブル同期

Hologres から Hologres へのリアルタイム単一テーブル同期

Kafka から Hologres へのリアルタイム単一テーブル同期

LogHub(SLS)から Hologres へのリアルタイム単一テーブル同期

Kafka から Hologres へのリアルタイム単一テーブル同期

Hologres から Kafka へのリアルタイム単一テーブル同期

LogHub(SLS)から MaxCompute へのリアルタイム単一テーブル同期

Kafka から OSS データレイクへのリアルタイム単一テーブル同期

Kafka から StarRocks へのリアルタイム単一テーブル同期

Oracle から Tablestore へのリアルタイム単一テーブル同期