Kafka を単一テーブルのリアルタイム同期のソースとして使用して、メッセージキューからリアルタイムでデータを取得し、そのデータを宛先に書き込むことができます。このトピックでは、Kafka 入力コンポーネントの設定方法について説明します。
適用範囲
Alibaba Cloud Kafka および自己管理 Kafka のバージョン 0.10.2 から 2.2.x までをサポートします。
0.10.2 より前の Kafka バージョンは、パーティションデータオフセットの取得をサポートしておらず、そのデータ構造はタイムスタンプをサポートしていない可能性があります。その結果、同期タスクの待機時間統計が不正確になったり、コンシューマオフセットを正しくリセットできなくなったりする可能性があります。
詳細については、「Kafka データソースの設定」をご参照ください。
手順
DataStudio ページに移動します。
DataWorks コンソールにログインします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、 を選択します。表示されたページで、ドロップダウンリストから目的のワークスペースを選択し、[データ開発へ進む] をクリックします。
DataStudio ページの [Scheduled Workflow] ペインで、ポインターを
アイコンに移動し、 を選択します。または、[Scheduled Workflow] ペインで目的のワークフローを見つけ、ワークフロー名を右クリックし、 を選択します。
[ノードの作成] ダイアログボックスで、[同期方法] パラメーターを [エンドツーエンド ETL] に設定し、[名前] と [パス] パラメーターを設定します。
[確認] をクリックします。
リアルタイム同期ノードの設定ページで、 をクリックし、編集パネルにドラッグします。
[Kafka] ノードをクリックします。[ノード設定] ダイアログボックスで、パラメーターを設定します。

パラメーター
説明
データソース
設定済みの Kafka データソースを選択します。Kafka データソースのみがサポートされています。利用可能なデータソースがない場合は、右側の [新しいデータソース] をクリックして ページに移動し、データソースを作成します。詳細については、「Kafka データソースの設定」をご参照ください。
Topic
Kafka Topic の名前。Topic は、Kafka がメッセージフィードを整理するために使用するカテゴリです。
Kafka クラスターに公開された各メッセージは、Topic に属します。Topic は、メッセージのグループのコレクションです。
説明Kafka 入力は 1 つの Topic のみをサポートします。
キータイプ
Kafka メッセージのキーのタイプ。この値は、KafkaConsumer を初期化する際の key.deserializer 設定を決定します。有効な値: STRING、BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG、および SHORT。
値タイプ
Kafka メッセージの値のタイプ。この値は、KafkaConsumer を初期化する際の value.deserializer 設定を決定します。有効な値: STRING、BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG、および SHORT。
出力モード
Kafka レコードを解析するために使用されるメソッド。
単一行出力: Kafka レコードを非構造化文字列または JSON オブジェクトとして解析します。1 つの Kafka レコードが 1 つの出力レコードに解析されます。
複数行出力: Kafka レコードを JSON 配列として解析します。各配列要素が 1 つの出力レコードに解析されます。したがって、1 つの Kafka レコードが複数の出力レコードに解析されることがあります。
説明この設定項目は一部のリージョンでのみサポートされています。この項目が表示されない場合は、お使いのリージョンで機能がリリースされるまでお待ちください。
配列のパス
[出力モード] が [複数行出力] に設定されている場合、Kafka レコード値内の JSON 配列のパスを指定します。パスは、
a.a1形式の特定の JSON オブジェクト内のフィールド、またはa[0].a1形式の特定の JSON 配列内のフィールドの参照をサポートします。この設定項目を空のままにすると、Kafka レコード値全体が JSON 配列として解析されます。解析対象の JSON 配列は、
["a","b"]のような数値または文字列の配列ではなく、[{"a":"hello"},{"b":"world"}]のようなオブジェクト配列でなければならないことに注意してください。設定パラメーター
Kafka データ消費クライアント (KafkaConsumer) を作成する際に、
kafkaConfig拡張パラメーターを設定して、データ読み取り動作を詳細に制御できます。さまざまな Kafka クラスターバージョンでサポートされているパラメーターの完全なリストについては、お使いのバージョンの 公式 Kafka ドキュメント をご参照ください。一般的なパラメーターの例:
bootstrap.serversauto.commit.interval.mssession.timeout.ms
group.idデフォルトの動作
デフォルトでは、リアルタイム同期タスクはランダムに生成された文字列を KafkaConsumer のgroup.idとして設定します。手動設定
固定のgroup.idを手動で指定できます。これにより、指定した使用者グループを使用して、Kafka クラスター内の同期タスクのコンシューマオフセットをモニターまたは監視できます。
出力フィールド
Kafka データ用の出力フィールド名をカスタマイズします:
[フィールドの追加] をクリックし、[フィールド名] を入力し、[タイプ] を選択してカスタムフィールドを追加します。
[値のメソッド] は、Kafka レコードからフィールド値を取得する方法を指定します。右側の
アイコンをクリックして、2 つの値のメソッドを切り替えます。プリセット値メソッド: Kafka レコードから値を取得するための 6 つのプリセットオプションを提供します:
value: メッセージ本文
key: メッセージキー
partition: パーティション番号
offset: メッセージオフセット
timestamp: メッセージのタイムスタンプ (ミリ秒)
headers: メッセージヘッダー
JSON 解析: . (サブフィールドを取得するため) および [] (配列要素を取得するため) の構文を使用して、複雑な JSON フォーマットからコンテンツを取得できます。下位互換性のために、__value__ のように 2 つのアンダースコア (_) で始まる文字列を使用して、Kafka レコードから特定のコンテンツをフィールド値として取得することもできます。次のコードは、Kafka データのサンプルを示しています。
{ "a": { "a1": "hello" }, "b": "world", "c":[ "xxxxxxx", "yyyyyyy" ], "d":[ { "AA":"this", "BB":"is_data" }, { "AA":"that", "BB":"is_also_data" } ] }出力フィールドの値は状況によって異なります:
Kafka レコード値を同期するには、値のメソッドを __value__ に設定します。
Kafka レコードキーを同期するには、値のメソッドを __key__ に設定します。
Kafka レコードパーティションを同期するには、値のメソッドを __partition__ に設定します。
Kafka レコードオフセットを同期するには、値のメソッドを __offset__ に設定します。
Kafka レコードのタイムスタンプを同期するには、値のメソッドを __timestamp__ に設定します。
Kafka レコードヘッダーを同期するには、値のメソッドを __headers__ に設定します。
a1 フィールドからデータ "hello" を同期するには、値メソッドを a.a1 に設定します。
b フィールドからデータ "world を同期するには、値メソッドを b に設定します。
c フィールドからデータ "yyyyyyy" を同期するには、値メソッドを c[1] にセットします。
AA フィールドからデータ "this" を同期するには、値メソッドを d[0].AA に設定します。
削除したいフィールドにポインターを合わせ、
アイコンをクリックします。
シナリオ例: [出力モード] を [複数行出力] に設定すると、システムはまず、Path of array で指定された JSON パスに基づいて JSON 配列を解析します。次に、配列から各 JSON オブジェクトを取得し、定義されたフィールド名と値のメソッドに基づいて出力フィールドを形成します。値のメソッドの定義は、単一行出力モードの場合と同じです。複雑な JSON フォーマットからコンテンツを取得するには、. (サブフィールドの取得) と [] (配列要素の取得) の構文を使用できます。次のコードは、Kafka インスタンスのサンプルデータを示しています。
{ "c": { "c0": [ { "AA": "this", "BB": "is_data" }, { "AA": "that", "BB": "is_also_data" } ] } }[配列のパス] を
c.c0に設定し、2 つの出力フィールドを定義した場合 (1 つはAAという名前で値のメソッドがAA、もう 1 つはBBという名前で値のメソッドがBB)、この Kafka レコードは次の 2 つのレコードに解析されます:
ツールバーの
アイコンをクリックします。説明Kafka 入力は 1 つの Topic のみをサポートします。