すべてのプロダクト
Search
ドキュメントセンター

DataWorks:Kafka 入力の設定

最終更新日:Oct 29, 2025

Kafka を単一テーブルのリアルタイム同期のソースとして使用して、メッセージキューからリアルタイムでデータを取得し、そのデータを宛先に書き込むことができます。このトピックでは、Kafka 入力コンポーネントの設定方法について説明します。

適用範囲

  • Alibaba Cloud Kafka および自己管理 Kafka のバージョン 0.10.2 から 2.2.x までをサポートします。

  • 0.10.2 より前の Kafka バージョンは、パーティションデータオフセットの取得をサポートしておらず、そのデータ構造はタイムスタンプをサポートしていない可能性があります。その結果、同期タスクの待機時間統計が不正確になったり、コンシューマオフセットを正しくリセットできなくなったりする可能性があります。

詳細については、「Kafka データソースの設定」をご参照ください。

手順

  1. DataStudio ページに移動します。

    DataWorks コンソールにログインします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、[データ開発と O&M] > [データ開発] を選択します。表示されたページで、ドロップダウンリストから目的のワークスペースを選択し、[データ開発へ進む] をクリックします。

  2. DataStudio ページの [Scheduled Workflow] ペインで、ポインターを 新建 アイコンに移動し、[ノードの作成] > [Data Integration] > [リアルタイム同期] を選択します。

    または、[Scheduled Workflow] ペインで目的のワークフローを見つけ、ワークフロー名を右クリックし、[ノードの作成] > [Data Integration] > [リアルタイム同期] を選択します。

  3. [ノードの作成] ダイアログボックスで、[同期方法] パラメーターを [エンドツーエンド ETL] に設定し、[名前][パス] パラメーターを設定します。

  4. [確認] をクリックします。

  5. リアルタイム同期ノードの設定ページで、[入力] > [Kafka] をクリックし、編集パネルにドラッグします。

  6. [Kafka] ノードをクリックします。[ノード設定] ダイアログボックスで、パラメーターを設定します。

    image

    パラメーター

    説明

    データソース

    設定済みの Kafka データソースを選択します。Kafka データソースのみがサポートされています。利用可能なデータソースがない場合は、右側の [新しいデータソース] をクリックして [ワークスペース管理] > [データソース] ページに移動し、データソースを作成します。詳細については、「Kafka データソースの設定」をご参照ください。

    Topic

    Kafka Topic の名前。Topic は、Kafka がメッセージフィードを整理するために使用するカテゴリです。

    Kafka クラスターに公開された各メッセージは、Topic に属します。Topic は、メッセージのグループのコレクションです。

    説明

    Kafka 入力は 1 つの Topic のみをサポートします。

    キータイプ

    Kafka メッセージのキーのタイプ。この値は、KafkaConsumer を初期化する際の key.deserializer 設定を決定します。有効な値: STRINGBYTEARRAYDOUBLEFLOATINTEGERLONG、および SHORT

    値タイプ

    Kafka メッセージの値のタイプ。この値は、KafkaConsumer を初期化する際の value.deserializer 設定を決定します。有効な値: STRINGBYTEARRAYDOUBLEFLOATINTEGERLONG、および SHORT

    出力モード

    Kafka レコードを解析するために使用されるメソッド。

    • 単一行出力: Kafka レコードを非構造化文字列または JSON オブジェクトとして解析します。1 つの Kafka レコードが 1 つの出力レコードに解析されます。

    • 複数行出力: Kafka レコードを JSON 配列として解析します。各配列要素が 1 つの出力レコードに解析されます。したがって、1 つの Kafka レコードが複数の出力レコードに解析されることがあります。

    説明

    この設定項目は一部のリージョンでのみサポートされています。この項目が表示されない場合は、お使いのリージョンで機能がリリースされるまでお待ちください。

    配列のパス

    [出力モード] が [複数行出力] に設定されている場合、Kafka レコード値内の JSON 配列のパスを指定します。パスは、a.a1 形式の特定の JSON オブジェクト内のフィールド、または a[0].a1 形式の特定の JSON 配列内のフィールドの参照をサポートします。この設定項目を空のままにすると、Kafka レコード値全体が JSON 配列として解析されます。

    解析対象の JSON 配列は、["a","b"] のような数値または文字列の配列ではなく、[{"a":"hello"},{"b":"world"}] のようなオブジェクト配列でなければならないことに注意してください。

    設定パラメーター

    Kafka データ消費クライアント (KafkaConsumer) を作成する際に、kafkaConfig 拡張パラメーターを設定して、データ読み取り動作を詳細に制御できます。さまざまな Kafka クラスターバージョンでサポートされているパラメーターの完全なリストについては、お使いのバージョンの 公式 Kafka ドキュメント をご参照ください。

    • 一般的なパラメーターの例:

      • bootstrap.servers

      • auto.commit.interval.ms

      • session.timeout.ms

    • group.id

      • デフォルトの動作
        デフォルトでは、リアルタイム同期タスクはランダムに生成された文字列を KafkaConsumer の group.id として設定します。

      • 手動設定
        固定の group.id を手動で指定できます。これにより、指定した使用者グループを使用して、Kafka クラスター内の同期タスクのコンシューマオフセットをモニターまたは監視できます。

    出力フィールド

    Kafka データ用の出力フィールド名をカスタマイズします:

    • [フィールドの追加] をクリックし、[フィールド名] を入力し、[タイプ] を選択してカスタムフィールドを追加します。

      [値のメソッド] は、Kafka レコードからフィールド値を取得する方法を指定します。右側の 箭头 アイコンをクリックして、2 つの値のメソッドを切り替えます。

      • プリセット値メソッド: Kafka レコードから値を取得するための 6 つのプリセットオプションを提供します:

        • value: メッセージ本文

        • key: メッセージキー

        • partition: パーティション番号

        • offset: メッセージオフセット

        • timestamp: メッセージのタイムスタンプ (ミリ秒)

        • headers: メッセージヘッダー

      • JSON 解析: . (サブフィールドを取得するため) および [] (配列要素を取得するため) の構文を使用して、複雑な JSON フォーマットからコンテンツを取得できます。下位互換性のために、__value__ のように 2 つのアンダースコア (_) で始まる文字列を使用して、Kafka レコードから特定のコンテンツをフィールド値として取得することもできます。次のコードは、Kafka データのサンプルを示しています。

        {
           "a": {
           "a1": "hello"
           },
           "b": "world",
           "c":[
              "xxxxxxx",
              "yyyyyyy"
              ],
           "d":[
              {
                 "AA":"this",
                 "BB":"is_data"
              },
              {
                 "AA":"that",
                 "BB":"is_also_data"
              }
            ]
        }
        • 出力フィールドの値は状況によって異なります:

          • Kafka レコード値を同期するには、値のメソッドを __value__ に設定します。

          • Kafka レコードキーを同期するには、値のメソッドを __key__ に設定します。

          • Kafka レコードパーティションを同期するには、値のメソッドを __partition__ に設定します。

          • Kafka レコードオフセットを同期するには、値のメソッドを __offset__ に設定します。

          • Kafka レコードのタイムスタンプを同期するには、値のメソッドを __timestamp__ に設定します。

          • Kafka レコードヘッダーを同期するには、値のメソッドを __headers__ に設定します。

          • a1 フィールドからデータ "hello" を同期するには、値メソッドを a.a1 に設定します。

          • b フィールドからデータ "world を同期するには、値メソッドを b に設定します。

          • c フィールドからデータ "yyyyyyy" を同期するには、値メソッドを c[1] にセットします。

          • AA フィールドからデータ "this" を同期するには、値メソッドを d[0].AA に設定します。

    • 削除したいフィールドにポインターを合わせ、删除 アイコンをクリックします。

    シナリオ例: [出力モード][複数行出力] に設定すると、システムはまず、Path of array で指定された JSON パスに基づいて JSON 配列を解析します。次に、配列から各 JSON オブジェクトを取得し、定義されたフィールド名と値のメソッドに基づいて出力フィールドを形成します。値のメソッドの定義は、単一行出力モードの場合と同じです。複雑な JSON フォーマットからコンテンツを取得するには、. (サブフィールドの取得) と [] (配列要素の取得) の構文を使用できます。次のコードは、Kafka インスタンスのサンプルデータを示しています。

    {
        "c": {
            "c0": [
                {
                    "AA": "this",
                    "BB": "is_data"
                },
                {
                    "AA": "that",
                    "BB": "is_also_data"
                }
            ]
        }
    }

    [配列のパス] を c.c0 に設定し、2 つの出力フィールドを定義した場合 (1 つは AA という名前で値のメソッドが AA、もう 1 つは BB という名前で値のメソッドが BB)、この Kafka レコードは次の 2 つのレコードに解析されます: 记录

  7. ツールバーの 保存 アイコンをクリックします。

    説明

    Kafka 入力は 1 つの Topic のみをサポートします。