すべてのプロダクト
Search
ドキュメントセンター

DataWorks:Kafka から MaxCompute への単一テーブルのオフライン同期

最終更新日:Nov 28, 2025

このトピックでは、定期スケジュールを設定して Kafka から MaxCompute へ増分データを同期する方法について説明します。この例では、分、時間、または日単位でデータを同期し、MaxCompute の時間単位または日単位のパーティションテーブルに書き込む方法をカバーします。

注意事項

  • ご利用の Kafka のバージョンは 0.10.2 以降、かつ 2.2.x 以前である必要があります。Kafka でレコードのタイムスタンプを有効にし、レコードに正しいビジネスタイムスタンプが含まれている必要があります。

  • 増分データ同期の開始後、開始時刻以前のタイムスタンプを持つレコードが Kafka Topic に書き込まれる可能性があります。これらのレコードは読み取られない場合があります。Kafka Topic へのデータ書き込みが遅延したり、タイムスタンプが順不同になったりする場合、オフライン同期タスクでデータが損失するリスクがあることにご注意ください。

  • Kafka 側の同期終了ポリシーについては、以下の条件を満たす場合にのみ [1分間新しいデータなし] を選択できます。そうでない場合、データ損失が発生する可能性があります。

    • Kafka Topic の一部またはすべてのパーティションに、10 分以上などの長期間にわたって新しいデータが書き込まれていない。

    • 各定期インスタンスの開始後、終了時刻パラメーターより前のタイムスタンプを持つレコードが Kafka Topic に書き込まれない。

前提条件

制限事項

ソースデータを MaxCompute の外部テーブルに同期することはサポートされていません。

操作手順

説明

このトピックでは、新しい DataStudio インターフェイスを使用して、オフライン同期タスクを設定する方法を説明します。

ステップ 1:ノードの作成とタスクの設定

コードレス UI でノードを作成および設定する一般的な手順については、「コードレス UI 設定」ガイドをご参照ください。このトピックでは、これらの手順は繰り返しません。

ステップ 2:データソースと宛先の設定

データソース (Kafka) の設定

このトピックでは、Kafka から MaxCompute への単一テーブルのオフライン同期を実行する方法について説明します。データソースは Kafka です。主要な設定項目は以下のとおりです。

説明

Kafka データソースの設定項目に関する一般的な説明については、「Kafka Reader」ドキュメントをご参照ください。以下は、このチュートリアルのリファレンス設定です。

設定項目

主要な設定

Topic

同期する Kafka Topic を選択します。標準モードの DataWorks ワークスペースを使用する場合、開発環境と本番環境の両方の Kafka クラスターに同じ名前の Topic が存在している必要があります。[Topic] には、この Topic を選択します。

説明

以下の場合:

  • 開発環境に Topic が存在しない場合、オフライン同期ノードを設定する際に[Topic] ドロップダウンリストで Topic を見つけられません。

  • 本番環境に Topic が存在しない場合、タスクが送信されて公開された後、定期スケジュールが失敗します。これは、タスクが同期するテーブルを見つけられないためです。

コンシューマーグループ ID

ビジネスニーズに基づいて、Kafka クラスターの一意の ID を入力します。これは、Kafka クラスター側での統計およびモニタリングに役立ちます。

読み取り開始オフセット開始時刻読み取り終了オフセット終了時刻

[読み取り開始オフセット][読み取り終了オフセット] の両方を [指定時刻] に設定します。[開始時刻][終了時刻] を、スケジューリングパラメーターの ${startTime}${endTime} にそれぞれ設定します。

これらのパラメーターは、データ同期の開始位置と終了位置を定義します。この設定は、同期が ${startTime} のデータから始まり、${endTime} のデータで終わることを意味します。${startTime}${endTime} パラメーターは、同期タスクの実行時にスケジューリングパラメーターの式に基づいて値に置き換えられます。

タイムゾーン

この項目は空のままにするか、DataWorks が配置されているリージョンのデフォルトのサーバータイムゾーンを選択できます。

説明

Alibaba Cloud のテクニカルサポートに連絡してスケジューリングタイムゾーンを変更した場合は、ここで変更後のタイムゾーンを選択できます。

キータイプ値タイプエンコーディング

Kafka Topic の実際のレコードに基づいて選択します。

同期完了ポリシー

同期終了ポリシーには、条件を満たす場合は [1 分間新規データなし] を選択し、それ以外の場合は [指定された終了位置で停止] を選択します。

  • Kafka Topic の一部またはすべてのパーティションに、10 分以上などの長期間にわたって新しいデータが書き込まれていない。

  • 各定期インスタンスの開始後、終了時刻パラメーターより前のタイムスタンプを持つレコードが Kafka Topic に書き込まれない。

詳細設定

デフォルト設定のままにします。

データ宛先 (MaxCompute) の設定

このトピックでは、Kafka から MaxCompute への単一テーブルのオフライン同期を実行する方法について説明します。データ宛先はテーブルです。主要な設定項目は以下のとおりです。

説明

次の表で説明されていないパラメーターについては、デフォルト設定のままにすることができます。

設定項目

主要な設定

データソース

前のステップで選択した MaxCompute データソースが表示されます。標準モードの DataWorks ワークスペースを使用する場合、開発プロジェクトと本番プロジェクトの名前が表示されます。

テーブル

同期先の MaxCompute テーブルを選択します。標準モードの DataWorks ワークスペースを使用する場合、開発環境と本番環境の両方に同じ名前とスキーマを持つ MaxCompute テーブルが存在することを確認してください。

[ターゲットテーブルスキーマの生成] をクリックすることもできます。システムが、データを受信するためのテーブルを自動的に作成します。テーブル作成文を手動で調整できます。

説明

以下の場合:

  • 開発環境に宛先の MaxCompute テーブルが存在しない場合、オフライン同期ノードの宛先テーブルを設定する際に、ドロップダウンリストでテーブルを見つけることができません。

  • 本番環境に宛先の MaxCompute テーブルが存在しない場合、タスクが送信されて公開された後、スケジュール通りに実行されると同期タスクは失敗します。これは、タスクが宛先テーブルを見つけられないためです。

  • 開発環境と本番環境のテーブルスキーマが一致しない場合、スケジュールされた実行中の列マッピングがオフライン同期ノードで設定されたマッピングと異なる可能性があります。これにより、誤ったデータ書き込みが発生する可能性があります。

パーティション

テーブルがパーティションテーブルの場合、パーティションキー列の値を入力できます。

  • 値は ds=20220101 のような静的フィールドにすることができます。

  • 値は ds=${partition} のようなスケジューリングシステムパラメーターにすることができます。スケジューリングシステムパラメーターは、タスクの実行時に自動的に置き換えられます。

ステップ 3:フィールドマッピングの設定

データソースと宛先を選択した後、リーダーとライター間の列マッピングを指定する必要があります。[同じ名前のフィールドをマッピング][同じ行のフィールドをマッピング][マッピングをクリア]、または [マッピングを手動で編集] を選択できます。

  • Kafka 側には 6 つのデフォルトフィールドがあります。

    フィールド名

    説明

    __key__

    Kafka レコードのキー。

    __value__

    Kafka レコードの値。

    __partition__

    Kafka レコードが配置されているパーティション番号。パーティション番号は 0 から始まります。

    __headers__

    Kafka レコードのヘッダー。

    __offset__

    パーティション内での Kafka レコードのオフセット。オフセットは 0 から始まります。

    __timestamp__

    Kafka レコードの 13 桁の整数ミリ秒タイムスタンプ。

  • Kafka 側のフィールドに対してカスタム JSON 解析を設定できます。JSON 形式の Kafka レコードの値フィールドからコンテンツを取得するには、. (サブフィールドを取得)[] (配列要素を取得) の構文を使用します。

    重要

    JSON フィールド名に「.」文字が含まれている場合、フィールド定義の構文があいまいになるため、フィールドを定義してフィールド値を取得することはできません。

    以下は、Kafka の JSON 形式レコードのデータ値の例です。

    {
          "a": {
          "a1": "hello"
          },
          "b": "world",
          "c":[
                "xxxxxxx",
                "yyyyyyy"
                ],
          "d":[
                {
                      "AA":"this",
                      "BB":"is_data"
                },
                {
                      "AA":"that",
                      "BB":"is_also_data"
                }
            ],
         "a.b": "unreachable"
    }
    • a1 のデータ "hello" を同期するには、Kafka 側にフィールド a.a1 を追加します。

    • b のデータ "world" を同期するには、Kafka 側にフィールド b を追加します。

    • c のデータ "yyyyyyy" を同期するには、Kafka 側にフィールド c[1] を追加します。

    • AA のデータ "this" を同期するには、Kafka 側にフィールド d[0].AA を追加します。

    • Kafka 側のフィールドが a.b と定義されている場合、データ "unreachable" を同期することはできません。

  • ソースまたは宛先テーブルのフィールドは、マッピングから除外できます。同期インスタンスは、マッピングされていないソースフィールドを読み取りません。マッピングされていない宛先フィールドには NULL が書き込まれます。

  • 1 つのソースフィールドを複数の宛先フィールドにマッピングすることはできません。また、1 つの宛先フィールドを複数のソースフィールドからマッピングすることもできません。

ステップ 4:詳細パラメーターの設定

タスクの右側にある[高度な設定]をクリックして、[予想される最大同時実行数][ダーティデータレコードのポリシー]などのパラメーターを設定します。このチュートリアルでは、[ダーティデータレコードのポリシー][ダーティデータレコードを無視]に設定し、その他のパラメーターはデフォルト値のままにします。詳細については、「コードレス UI 設定」をご参照ください。

ステップ 5:テストの設定と実行

  1. オフライン同期ノード編集ページの右側で、[デバッグ設定] をクリックします。テスト実行用に [リソースグループ][スクリプトパラメーター] を設定します。次に、上部のツールバーで [実行] をクリックして、同期リンクが正常に実行されるかテストします。

  2. 左側のナビゲーションウィンドウで image アイコンをクリックし、次に [個人用ディレクトリ] の右側にある image アイコンをクリックすると、.sql 拡張子のファイルを作成できます。その後、次の SQL クエリを実行して、宛先テーブルのデータが期待どおりであるかどうかを確認できます。

    説明
    SELECT * FROM <MaxCompute_destination_table_name> WHERE pt=<specified_partition> LIMIT 20;

ステップ 6:スケジューリングの設定とタスクの公開

オフライン同期タスクの右側にある[スケジューリング]をクリックします。 定期実行用にスケジューリング構成パラメーターを設定します。 次に、上部のツールバーにある[公開]をクリックして、公開パネルを開きます。 画面の指示に従って、タスクを公開します。

データソースと宛先を設定する際に、${startTime}${endTime}${partition} の 3 つのスケジューリングパラメーターを使用しました。スケジューリング設定では、同期のニーズに基づいてこれらのパラメーターの置換ポリシーを指定する必要があります。以下は、いくつかの典型的なシナリオの設定例です。

典型的なシナリオ

推奨設定

シナリオの説明

同期タスクは 5 分ごとにスケジュールされる

  • startTime=$[yyyymmddhh24mi-8/24/60]00

  • endTime=$[yyyymmddhh24mi-3/24/60]00

  • partition=$[yyyymmddhh24mi-8/24/60]

同期タスクが 2022-11-22 10:00 に開始するようにスケジュールされている場合:

  • 2022-11-22 09:52 (含む) から 2022-11-22 09:57 (含まない) までのタイムスタンプを持つ Kafka Topic のレコードを同期します。

  • 同期された Kafka データは、MaxCompute の 202211220952 パーティションに書き込まれます。

  • endTime は、インスタンスのスケジューリング時刻 ($[yyyymmddhh24mi]) より 3 分早く設定されます。この方法により、同期タスクインスタンスが開始する前に、対象期間のすべてのデータが Kafka Topic に書き込まれることが保証され、データ損失を防ぎます。

同期タスクは 1 時間ごとにスケジュールされる

  • startTime=$[yyyymmddhh24-1/24]0000

  • endTime=$[yyyymmddhh24]0000

  • partition=$[yyyymmddhh24]

説明
  • 同期タスクが 2 時間ごとにスケジュールされる場合、startTime=$[yyyymmddhh24-2/24]0000 に設定します。他のスケジューリングパラメーターは変更しません。

  • 同期タスクが 3 時間ごとにスケジュールされる場合、startTime=$[yyyymmddhh24-3/24]0000 に設定します。他のスケジューリングパラメーターは変更しません。

  • 他の時間単位のスケジューリング周期にも同じロジックが適用されます。

同期タスクが 2022-11-22 10:05 に開始するようにスケジュールされている場合:

  • 2022-11-22 09:00 (含む) から 2022-11-22 10:00 (含まない) までのタイムスタンプを持つ Kafka Topic のレコードを同期します。

  • 同期された Kafka データは、MaxCompute の 2022112210 パーティションに書き込まれます。

同期タスクは毎日スケジュールされる

  • startTime=$[yyyymmdd-1]000000

  • endTime=$[yyyymmdd]000000

  • partition=$[yyyymmdd-1]

同期タスクが 2022-11-22 00:05 に開始するようにスケジュールされている場合:

  • 2022-11-21 00:00 (含む) から 2022-11-22 00:00 (含まない) までのタイムスタンプを持つ Kafka Topic のレコードを同期します。

  • 同期された Kafka データは、MaxCompute の 20221121 パーティションに書き込まれます。

同期タスクは毎週スケジュールされる

  • startTime=$[yyyymmdd-7]000000

  • endTime=$[yyyymmdd]000000

  • partition=$[yyyymmdd-1]

同期タスクが 2022-11-22 00:05 に開始するようにスケジュールされている場合:

  • 2022-11-15 00:00 (含む) から 2022-11-22 00:00 (含まない) までのタイムスタンプを持つ Kafka Topic のレコードを同期します。

  • 同期された Kafka データは、MaxCompute の 20221121 パーティションに書き込まれます。

同期タスクは毎月スケジュールされる

  • startTime=$[add_months(yyyymmdd,-1)]000000

  • endTime=$[yyyymmdd]000000

  • partition=$[yyyymmdd-1]

同期タスクが 2022-11-22 00:05 に開始するようにスケジュールされている場合:

  • 2022-10-22 00:00 (含む) から 2022-11-22 00:00 (含まない) までのタイムスタンプを持つ Kafka Topic のレコードを同期します。

  • 同期された Kafka データは、MaxCompute の 20221121 パーティションに書き込まれます。

希望する間隔に基づいてスケジューリング周期を設定します。

典型的なシナリオ

推奨設定

シナリオの説明

同期タスクは 5 分ごとにスケジュールされる

  • スケジューリング周期:分

  • 開始時刻:00:00

  • 間隔:5 分

  • 終了時刻:23:59

なし

同期タスクは 1 時間ごとにスケジュールされる

  • スケジューリング周期:時間

  • 開始時刻:00:15

  • 間隔:1 時間

  • 終了時刻:23:59

開始時刻を 00:00 より少し遅い時刻 (例:00:15) に設定します。この方法により、同期タスクインスタンスが開始する前に、対象期間のすべてのデータが Kafka Topic に書き込まれることが保証されます。

同期タスクは毎日スケジュールされる

  • スケジューリング周期:日

  • スケジューリング時刻:00:15

スケジューリング時刻を 00:00 より少し遅い時刻 (例:00:15) に設定します。この方法により、同期タスクインスタンスが開始する前に、対象期間のすべてのデータが Kafka Topic に書き込まれることが保証されます。

同期タスクは毎週スケジュールされる

  • スケジューリング周期:週

  • 指定時刻:月曜日

  • スケジューリング時刻:00:15

スケジューリング時刻を 00:00 より少し遅い時刻 (例:00:15) に設定します。この方法により、同期タスクインスタンスが開始する前に、対象期間のすべてのデータが Kafka Topic に書き込まれることが保証されます。

同期タスクは毎月スケジュールされる

  • スケジューリング周期:月

  • 指定時刻:毎月 1 日

  • スケジューリング時刻:00:15

スケジューリング時刻を 00:00 より少し遅い時刻 (例:00:15) に設定します。この方法により、同期タスクインスタンスが開始する前に、対象期間のすべてのデータが Kafka Topic に書き込まれることが保証されます。

重要

インスタンスの開始後に開始時刻以前のタイムスタンプを持つレコードが Kafka Topic に書き込まれた場合、これらのレコードは読み取られない可能性があります。Kafka Topic へのデータ書き込みが遅延したり、タイムスタンプが順不同になったりする場合、オフライン同期タスクでデータが損失するリスクがあることにご注意ください。