すべてのプロダクト
Search
ドキュメントセンター

:Kafka入力の設定

最終更新日:Jun 21, 2026

リアルタイム同期のソースとして Kafka を使用し、メッセージキューからデータを継続的に読み取り、送信先に書き込みます。このトピックでは、Kafka リーダーの設定方法について説明します。

前提条件

  • Alibaba Cloud Kafka およびバージョン 0.10.2 から 2.2.x (両方のバージョンを含む) までのセルフマネージド Kafka をサポートします。

  • 0.10.2 より前のバージョンの Kafka の使用は推奨されません。これらのバージョンはパーティションオフセットの取得をサポートしておらず、データ構造にタイムスタンプがない場合があります。これにより、同期タスクのレイテンシ統計が不正確になり、適切なオフセットのリセットができなくなります。

Kafka データソースの設定方法の詳細については、「Kafkaデータソースの設定」をご参照ください。

手順

  1. DataStudioページに移動します。

    DataWorks コンソールにログインします。 左側のナビゲーションウィンドウで、[データモデリングと開発] > [DataStudio] を選択します。 表示されるページで、ドロップダウンリストから目的のワークスペースを選択し、[DataStudioに移動] をクリックします。

  2. DataStudio ページの [スケジュールワークフロー] ペインで、ポインターを 新建 アイコンの上に移動し、[ノードの作成] > [data Integration] > [リアルタイム同期] を選択します。

    または、[スケジュールワークフロー] ペインで目的のワークフローを見つけ、ワークフロー名を右クリックして、[ノードの作成] > [data Integration] > [リアルタイム同期] を選択します。

  3. [ノードの作成] ダイアログボックスで、[同期方法] パラメーターを [エンドツーエンド ETL] に設定し、[名前] パラメーターと [パス] パラメーターを構成します。

  4. [確認] をクリックします。

  5. リアルタイム同期ノードの設定ページで、Input を選択し、編集キャンバスにドラッグします。

  6. [Kafka] ノードをクリックし、Node Configuration ダイアログボックスでそのパラメーターを設定します。

    パラメーター

    説明

    [Data Source]

    設定済みの Kafka データソースを選択します。Kafka データソースのみがサポートされています。データソースが設定されていない場合は、右側の New data source をクリックして [ワークスペース管理] > Data Source ページに移動して作成します。詳細については、「Kafkaデータソースの設定」をご参照ください。

    [Topic]

    メッセージを読み取るトピックの名前。

    トピックは、Kafka クラスター内で関連メッセージをグループ化するために使用されるカテゴリです。

    説明

    1つの Kafka リーダーは、1つのトピックからのみ読み取ることができます。

    [Key Type]

    Kafka キーのタイプによって、KafkaConsumer を初期化するための key.deserializer 設定が決まります。使用可能な値には、STRINGBYTEARRAYDOUBLEFLOATINTEGERLONGSHORT があります。

    [Value Type]

    KafkaConsumer を初期化する際、Kafka の値のタイプによって value.deserializer 設定が決まります。有効な値には、STRINGBYTEARRAYDOUBLEFLOATINTEGERLONGSHORT があります。

    [Output Mode]

    Kafka レコードの解析方法を定義します。

    • 単一行出力:Kafka レコードを非構造化文字列または JSON オブジェクトとして解析します。1つの Kafka レコードから1つの出力レコードが生成されます。

    • 複数行出力:Kafka レコードを JSON 配列として解析します。配列内の各要素から1つの出力レコードが生成されます。したがって、1つの Kafka レコードから複数の出力レコードが生成されることがあります。

    説明

    このパラメーターは一部のリージョンでのみ利用可能です。お使いのリージョンで表示されない場合は、この機能がリリースされるまでお待ちください。

    [Path of Array]

    出力モードが複数行出力に設定されている場合、Kafka レコードの値に含まれる JSON 配列へのパスを指定します。a.a1 形式を使用して特定の JSON オブジェクト内のフィールドを参照したり、a[0].a1 形式を使用して特定の JSON 配列内のフィールドを参照したりできます。このパラメーターが空の場合、Kafka レコードの値全体が JSON 配列として解析されます。

    対象の JSON 配列は、[{"a":"hello"},{"b":"world"}] のようなオブジェクトの配列である必要があります。["a","b"] のような数値や文字列などのプリミティブ型の配列は使用できません。

    [Configuration parameters]

    Kafka コンシューマークライアント (KafkaConsumer) を作成する際に、拡張パラメーターを kafkaConfig に追加して、データ読み取りの動作をカスタマイズできます。各 Kafka クラスターバージョンでサポートされているパラメーターの完全なリストについては、お使いのバージョンの公式 Kafkaドキュメントをご参照ください。

    • 共通パラメーター:

      • bootstrap.servers

      • auto.commit.interval.ms

      • session.timeout.ms

    • group.id

      • デフォルトの動作
        デフォルトでは、リアルタイム同期タスクはランダムに生成された文字列を KafkaConsumergroup.id として割り当てます。

      • 手動設定
        固定の group.id を手動で指定できます。これにより、指定したコンシューマーグループを使用して、Kafka クラスター内のタスクの消費オフセットを監視または観測できます。

    [Output Field]

    各 Kafka メッセージから抽出するフィールドを定義します。

    • Add more fields をクリックし、Field を入力して Value を選択し、カスタムフィールドを追加します。

      [Assignment Method]は、Kafka レコードからフィールドの値を抽出する方法を指定します。右側の 箭头 アイコンをクリックして、2つの値の抽出方法を切り替えます。

      • プリセットメソッド:Kafka レコードから値を抽出するための6つの組み込みメソッドを提供します。

        • value:メッセージ本文。

        • key:メッセージキー。

        • partition:パーティション番号。

        • offset:メッセージオフセット。

        • timestamp:メッセージのタイムスタンプ (ミリ秒単位)。

        • headers:メッセージヘッダー。

      • JSON解析:ドット表記法 (.) を使用してサブフィールドにアクセスし、角括弧表記法 ([]) を使用して配列要素にアクセスすることで、複雑な JSON から値を抽出します。下位互換性のために、二重アンダースコア (例: __value__) を使用した従来の構文を使用して、Kafka レコードからメタデータまたはメッセージ本文全体を抽出できます。次のコードブロックは、Kafka データの例を示しています。

        {
           "a": {
           "a1": "hello"
           },
           "b": "world",
           "c":[
              "xxxxxxx",
              "yyyyyyy"
              ],
           "d":[
              {
                 "AA":"this",
                 "BB":"is_data"
              },
              {
                 "AA":"that",
                 "BB":"is_also_data"
              }
            ]
        }
        • さまざまなシナリオでの出力フィールドの値は次のとおりです。

          • Kafka レコードの値を同期するには、値の取得方法として value と入力します。

          • Kafka レコードのキーを同期するには、値の取得方法として key と入力します。

          • Kafka レコードのパーティションを同期するには、値の取得方法として partition と入力します。

          • Kafka レコードのオフセットを同期するには、値の取得方法として offset と入力します。

          • Kafka レコードのタイムスタンプを同期するには、値の取得方法として timestamp と入力します。

          • Kafka レコードのヘッダーを同期するには、値の取得方法として headers と入力します。

          • a1 からデータ "hello" を同期するには、パスとして a.a1 と入力します。

          • b からデータ "world" を同期するには、値の取得方法として b と入力します。

          • c のデータ "yyyyyyy" を同期するには、値の取得方法として c[1] と入力します。

          • AA からデータ "this" を同期するには、値の取得方法として d[0].AA と入力します。

    • フィールドを削除するには、そのフィールドにカーソルを合わせ、表示される 删除 アイコンをクリックします。

    シナリオ例Output ModeMulti-row Output を選択した場合、システムはまず指定された JSON パスに基づいて JSON 配列を解析します。次に、配列から各 JSON オブジェクトを抽出し、定義されたフィールド名と値の取得方法に基づいて出力フィールドを構築します。値の取得方法は、単一行出力モードの場合と同じです。. 表記 (サブフィールドの取得) と [] 表記 (配列要素の取得) を使用して、複雑な JSON 形式からコンテンツを取得できます。Kafka インスタンスのデータは次のとおりです。

    {
        "c": {
            "c0": [
                {
                    "AA": "this",
                    "BB": "is_data"
                },
                {
                    "AA": "that",
                    "BB": "is_also_data"
                }
            ]
        }
    }

    配列の場所が c.c0 に設定され、2つの出力フィールドを定義した場合 (AA という名前のフィールドの値は AA から、BB という名前のフィールドの値は BB から取得)、Kafka レコードは2つのレコードに解析されます (1つは AA=thisBB=is_data、もう1つは AA=thatBB=is_also_data)。

  7. ツールバーの 保存 アイコンをクリックします。

    説明

    1つの Kafka リーダーは、1つのトピックからのみ読み取ることができます。