すべてのプロダクト
Search
ドキュメントセンター

DataWorks:Kafka から MaxCompute への単一テーブルのバッチ同期

最終更新日:Feb 13, 2026

本トピックでは、Kafka から MaxCompute へ増分データを定期的に同期するスケジュールを設定する方法について説明します。例として、1 分ごと、1 時間ごと、または 1 日ごとの同期を実行し、MaxCompute の時間単位または日単位のパーティションテーブルにデータを書き込む方法を示します。

注意事項

  • Kafka のバージョンは 0.10.2 以降かつ 2.2.x 以前である必要があります。また、Kafka ではレコードのタイムスタンプが有効化されており、レコードに正しいビジネスタイムスタンプが含まれている必要があります。

  • 増分データ同期が開始された後も、開始時刻以前または等しいタイムスタンプを持つレコードが Kafka Topic に書き込まれる場合があります。これらのレコードは読み取られない可能性があります。Kafka Topic へのデータ書き込みが遅延している場合や、タイムスタンプの順序が保たれていない場合、オフライン同期タスクでデータ損失が発生するリスクがあることにご注意ください。

  • Kafka 側の同期終了ポリシーとして、「1 分間新しいデータがない」を選択できるのは、以下の条件をすべて満たす場合のみです。それ以外の場合、データ損失が発生する可能性があります。

    • Kafka Topic の一部またはすべてのパーティションに、長期間(例:10 分以上)新しいデータが書き込まれていない状態が継続している。

    • 各定期インスタンスが開始した後、終了時刻パラメーター以前のタイムスタンプを持つレコードが Kafka Topic に書き込まれていない。

前提条件

制限事項

ソースデータを MaxCompute の外部テーブルに同期することはサポートされていません。

操作手順

説明

本トピックでは、オフライン同期タスクの構成方法を説明するために、新しい DataStudio インターフェイスを使用します。

ステップ 1:ノードの作成とタスクの構成

コードレス UI でノードを作成・構成する一般的な手順については、「コードレス UI の構成」ガイドをご参照ください。本トピックでは、これらの手順は繰り返しません。

ステップ 2:データソースと送信先の構成

Kafka のデータソースの構成

本トピックでは、Kafka から MaxCompute への単一テーブルのオフライン同期の方法について説明します。データソースは Kafka です。主な構成ポイントは以下のとおりです。

説明

Kafka データソースの構成項目の一般的な説明については、「Kafka Reader」ドキュメントをご参照ください。以下は、本チュートリアルの参考構成です。

設定項目

主な構成

Topic

同期対象の Kafka Topic を選択します。標準モードの DataWorks ワークスペースを使用する場合、開発環境および本番環境の両方の Kafka クラスターに、同じ名前の Topic が存在している必要があります。「Topic」には、この Topic を選択します。

説明

以下の場合は:

  • 開発環境に Topic が存在しない場合、オフライン同期ノードの構成時に「Topic」ドロップダウンリストに Topic が表示されません。

  • 本番環境に Topic が存在しない場合、タスクを提出・公開後に定期スケジュールが失敗します。これは、同期対象のテーブルが見つからないためです。

Consumer Group ID

業務要件に基づき、Kafka クラスターに対して一意の ID を入力します。これにより、Kafka クラスター側での統計およびモニタリングが可能になります。

Read Start OffsetStart TimeRead End OffsetEnd Time

Read Start Offset および Read End Offset の両方を「特定の時刻」に設定します。Start Time および End Time には、それぞれスケジューリングパラメーター ${startTime} および ${endTime} を設定します。

これらのパラメーターは、データ同期の開始位置および終了位置を定義します。この構成では、同期は ${startTime} 以降のデータから開始し、${endTime} までのデータで終了します。${startTime} および ${endTime} は、同期タスク実行時にスケジューリングパラメーター式に基づいて値に置き換えられます。

Time Zone

空欄のままにするか、DataWorks が配置されているリージョンのデフォルトサーバー時刻帯を選択できます。

説明

Alibaba Cloud のテクニカルサポートに連絡してスケジューリング時刻帯を変更済みの場合は、ここで変更後の時刻帯を選択できます。

Key TypeValue TypeEncoding

Kafka Topic 内の実際のレコードに基づき選択します。

同期完了ポリシー

同期終了ポリシーとして、条件を満たす場合のみ「1 分間新しいデータがない」を選択します。それ以外の場合は、「指定された終了位置で停止」を選択します。

  • Kafka Topic の一部またはすべてのパーティションに、長期間(例:10 分以上)新しいデータが書き込まれていない状態が継続している。

  • 各定期インスタンスが開始した後、終了時刻パラメーター以前のタイムスタンプを持つレコードが Kafka Topic に書き込まれていない。

高度な構成

デフォルト設定のままとします。

MaxCompute のデータ送信先の構成

本トピックでは、Kafka から MaxCompute への単一テーブルのオフライン同期の方法について説明します。データ送信先はテーブルです。主な構成ポイントは以下のとおりです。

説明

以下の表で説明していないパラメーターについては、デフォルト設定のままとします。

設定項目

主な構成

データソース

前工程で選択した MaxCompute データソースが表示されます。標準モードの DataWorks ワークスペースを使用する場合、開発プロジェクトおよび本番プロジェクトの名称が表示されます。

Table

同期先の MaxCompute テーブルを選択します。標準モードの DataWorks ワークスペースを使用する場合、開発環境および本番環境の両方に、同じ名前およびスキーマを持つ MaxCompute テーブルが存在することを確認してください。

また、「ターゲットテーブルスキーマの生成」をクリックすると、システムが自動的に受信用テーブルを作成します。テーブル作成文は手動で調整可能です。

説明

以下の条件が該当する場合:

  • 送信先 MaxCompute テーブルが開発環境に存在しない場合、オフライン同期ノードの送信先テーブル構成時にドロップダウンリストにテーブルが表示されません。

  • 送信先 MaxCompute テーブルが本番環境に存在しない場合、タスクを提出・公開後にスケジュール実行時に同期タスクが失敗します。これは、送信先テーブルが見つからないためです。

  • 開発環境および本番環境のテーブルスキーマが不一致の場合、スケジュール実行時のカラムマッピングが、オフライン同期ノードで構成したマッピングと異なる可能性があります。これにより、誤ったデータ書き込みが発生する場合があります。

Partition

テーブルがパーティションテーブルの場合、パーティションキー列の値を入力できます。

  • 値は静的フィールド(例:ds=20220101)とすることができます。

  • 値はスケジューリングシステムパラメーター(例:ds=${partition})とすることができます。スケジューリングシステムパラメーターは、タスク実行時に自動的に置き換えられます。

ステップ 3:フィールドマッピングの構成

データソースおよび送信先を選択した後、リーダーとライター間のカラムマッピングを指定する必要があります。「同名フィールドのマップ」、「同一行のフィールドをマップ」、「マッピングのクリア」、または「マッピングの手動編集」を選択できます。

  • Kafka 側には、6 つのデフォルトフィールドがあります。

    フィールド名

    説明

    __key__

    Kafka レコードのキー。

    __value__

    Kafka レコードの値。

    __partition__

    Kafka レコードが配置されているパーティション番号。パーティション番号は 0 から始まります。

    __headers__

    Kafka レコードのヘッダー。

    __offset__

    Kafka レコードのパーティション内におけるオフセット。オフセットは 0 から始まります。

    __timestamp__

    Kafka レコードの 13 桁整数ミリ秒タイムスタンプ。

  • Kafka 側のフィールドに対してカスタム JSON 解析を構成できます。.(サブフィールドを取得) および [](配列要素を取得) の構文を使用することで、JSON 形式の Kafka レコードの value フィールドから内容を取得できます。

    重要

    JSON フィールド名に「.」文字が含まれる場合、フィールド定義によってそのフィールド値を取得できません。これは、フィールド定義構文において曖昧さが生じるためです。

    以下は、Kafka 内の JSON 形式レコードのデータ値の例です。

    {
          "a": {
          "a1": "hello"
          },
          "b": "world",
          "c":[
                "xxxxxxx",
                "yyyyyyy"
                ],
          "d":[
                {
                      "AA":"this",
                      "BB":"is_data"
                },
                {
                      "AA":"that",
                      "BB":"is_also_data"
                }
            ],
         "a.b": "unreachable"
    }
    • a1 のデータ("hello")を同期するには、Kafka 側にフィールド a.a1 を追加します。

    • b のデータ("world")を同期するには、Kafka 側にフィールド b を追加します。

    • c のデータ("yyyyyyy")を同期するには、Kafka 側にフィールド c[1] を追加します。

    • AA のデータ("this")を同期するには、Kafka 側にフィールド d[0].AA を追加します。

    • Kafka 側のフィールドを a.b として定義した場合、データ "unreachable" を同期できません。

  • ソースまたは送信先テーブルのフィールドをマッピングから除外できます。マッピングされていないソースフィールドは同期インスタンスによって読み取られず、マッピングされていない送信先フィールドには NULL が書き込まれます。

  • 1 つのソースフィールドを複数の送信先フィールドにマッピングすることはできません。また、1 つの送信先フィールドを複数のソースフィールドからマッピングすることもできません。

ステップ 4:高度なパラメーターの構成

タスク右側の「高度な構成」をクリックし、「最大予想同時実行数」および「不正データレコードの処理ポリシー」などのパラメーターを設定します。本チュートリアルでは、「不正データレコードの処理ポリシー」を「不正データレコードを無視」に設定し、その他のパラメーターはデフォルト値のままとします。詳細については、「コードレス UI の構成」をご参照ください。

ステップ 5:テストの構成と実行

  1. オフライン同期ノードの編集ページ右側で、「Run Configuration」をクリックします。テスト実行用に「リソースグループ」および「スクリプトパラメーター」を設定し、上部ツールバーの「実行」をクリックして、同期リンクが正常に動作するかをテストします。

  2. 左側のナビゲーションウィンドウで image アイコンをクリックし、次に [個人ディレクトリ] の右側にある image アイコンをクリックすると、.sql 拡張子のファイルを作成できます。 次に、以下の SQL クエリを実行して、宛先テーブルのデータが期待どおりであるかを確認できます。

    説明
    SELECT * FROM <MaxCompute_destination_table_name> WHERE pt=<specified_partition> LIMIT 20;

ステップ 6:スケジューリングの構成とタスクの公開

オフライン同期タスク右側の「スケジューリング」をクリックします。「スケジューリング構成」パラメーターを定期実行用に設定し、上部ツールバーの「公開」をクリックして公開パネルを開きます。画面上の指示に従って、タスクを公開 します。

データソースおよび送信先の構成時に、3 つのスケジューリングパラメーター(${startTime}${endTime}${partition})を使用しました。スケジューリング構成では、同期要件に応じて、これらのパラメーターの置き換えポリシーを指定する必要があります。以下に、いくつかの典型的なシナリオの構成例を示します。

典型的なシナリオ

推奨構成

シナリオの説明

同期タスクが 5 分ごとにスケジュールされる

  • startTime=$[yyyymmddhh24mi-8/24/60]00

  • endTime=$[yyyymmddhh24mi-3/24/60]00

  • partition=$[yyyymmddhh24mi-8/24/60]

同期タスクが 2022-11-22 の 10:00 にスケジュールされる場合:

  • 2022-11-22 の 09:52(含む)から 2022-11-22 の 09:57(含まない)までのタイムスタンプを持つ Kafka Topic のレコードを同期します。

  • 同期された Kafka データは、MaxCompute の 202211220952 パーティションに書き込まれます。

  • endTime は、インスタンスのスケジューリング時刻($[yyyymmddhh24mi])より 3 分前に設定されています。これにより、同期タスクインスタンスの開始前に、対象時間範囲のすべてのデータが Kafka Topic に書き込まれることを保証し、データ損失を防止します。

同期タスクが 1 時間ごとにスケジュールされる

  • startTime=$[yyyymmddhh24-1/24]0000

  • endTime=$[yyyymmddhh24]0000

  • partition=$[yyyymmddhh24]

説明
  • 同期タスクが 2 時間ごとにスケジュールされる場合、startTime=$[yyyymmddhh24-2/24]0000 と設定します。その他のスケジューリングパラメーターは変更しません。

  • 同期タスクが 3 時間ごとにスケジュールされる場合、startTime=$[yyyymmddhh24-3/24]0000 と設定します。その他のスケジューリングパラメーターは変更しません。

  • その他の時間単位のスケジューリングサイクルにも同様のロジックが適用されます。

同期タスクが 2022-11-22 の 10:05 にスケジュールされる場合:

  • 2022-11-22 の 09:00(含む)から 2022-11-22 の 10:00(含まない)までのタイムスタンプを持つ Kafka Topic のレコードを同期します。

  • 同期された Kafka データは、MaxCompute の 2022112210 パーティションに書き込まれます。

同期タスクが 1 日ごとにスケジュールされる

  • startTime=$[yyyymmdd-1]000000

  • endTime=$[yyyymmdd]000000

  • partition=$[yyyymmdd-1]

同期タスクが 2022-11-22 の 00:05 にスケジュールされる場合:

  • 2022-11-21 の 00:00(含む)から 2022-11-22 の 00:00(含まない)までのタイムスタンプを持つ Kafka Topic のレコードを同期します。

  • 同期された Kafka データは、MaxCompute の 20221121 パーティションに書き込まれます。

同期タスクが 1 週間ごとにスケジュールされる

  • startTime=$[yyyymmdd-7]000000

  • endTime=$[yyyymmdd]000000

  • partition=$[yyyymmdd-1]

同期タスクが 2022-11-22 の 00:05 にスケジュールされる場合:

  • 2022-11-15 の 00:00(含む)から 2022-11-22 の 00:00(含まない)までのタイムスタンプを持つ Kafka Topic のレコードを同期します。

  • 同期された Kafka データは、MaxCompute の 20221121 パーティションに書き込まれます。

同期タスクが 1 ヶ月ごとにスケジュールされる

  • startTime=$[add_months(yyyymmdd,-1)]000000

  • endTime=$[yyyymmdd]000000

  • partition=$[yyyymmdd-1]

同期タスクが 2022-11-22 の 00:05 にスケジュールされる場合:

  • 2022-10-22 の 00:00(含む)から 2022-11-22 の 00:00(含まない)までのタイムスタンプを持つ Kafka Topic のレコードを同期します。

  • 同期された Kafka データは、MaxCompute の 20221121 パーティションに書き込まれます。

目的の間隔に応じてスケジューリングサイクルを設定します。

典型的なシナリオ

推奨構成

シナリオの説明

同期タスクが 5 分ごとにスケジュールされる

  • スケジューリングサイクル:分

  • 開始時刻:00:00

  • 間隔:5 分

  • 終了時刻:23:59

該当なし

同期タスクが 1 時間ごとにスケジュールされる

  • スケジューリングサイクル:時間

  • 開始時刻:00:15

  • 間隔:1 時間

  • 終了時刻:23:59

開始時刻を 00:00 より若干遅らせて(例:00:15)設定します。これにより、同期タスクインスタンスの開始前に、対象時間範囲のすべてのデータが Kafka Topic に書き込まれることを保証します。

同期タスクが 1 日ごとにスケジュールされる

  • スケジューリングサイクル:日

  • スケジュール時刻:00:15

スケジュール時刻を 00:00 より若干遅らせて(例:00:15)設定します。これにより、同期タスクインスタンスの開始前に、対象時間範囲のすべてのデータが Kafka Topic に書き込まれることを保証します。

同期タスクが 1 週間ごとにスケジュールされる

  • スケジューリングサイクル:週

  • 指定時刻: 月曜日

  • スケジュール時刻:00:15

スケジュール時刻を 00:00 より若干遅らせて(例:00:15)設定します。これにより、同期タスクインスタンスの開始前に、対象時間範囲のすべてのデータが Kafka Topic に書き込まれることを保証します。

同期タスクが 1 ヶ月ごとにスケジュールされる

  • スケジューリングサイクル:月

  • 指定時刻:毎月 1 日

  • スケジュール時刻:00:15

スケジュール時刻を 00:00 より若干遅らせて(例:00:15)設定します。これにより、同期タスクインスタンスの開始前に、対象時間範囲のすべてのデータが Kafka Topic に書き込まれることを保証します。

重要

インスタンス開始後に、開始時刻以前または等しいタイムスタンプを持つレコードが Kafka Topic に書き込まれた場合、これらのレコードは読み取られない可能性があります。Kafka Topic へのデータ書き込みが遅延している場合や、タイムスタンプの順序が保たれていない場合、オフライン同期タスクでデータ損失が発生するリスクがあることにご注意ください。