Kafka 入力コンポーネントを構成すると、Kafka データソースからビッグデータプラットフォームに接続されたストレージシステムにデータを読み込み、データ統合と二次処理を行うことができます。このトピックでは、Kafka 入力コンポーネントを構成する方法について説明します。
前提条件
開始する前に、次の操作が完了していることを確認してください。
Kafka データソースを作成しました。 詳細については、「Kafka データソースの作成」をご参照ください。
Kafka 入力コンポーネントのプロパティを構成するために使用するアカウントは、データソースに対するリードスルー権限を持っている必要があります。 権限がない場合は、データソース権限をリクエストする必要があります。 詳細については、「データソース権限のリクエスト、更新、および返却」をご参照ください。
手順
Dataphin ホームページの上部ナビゲーションバーで、[開発] > [Data Integration] を選択します。
統合ページの上部ナビゲーションバーで、プロジェクトを選択します(開発 - 本番モードでは、環境を選択する必要があります)。
左側のナビゲーションウィンドウで、[バッチパイプライン] をクリックします。 [バッチパイプライン] リストで、開発するオフラインパイプラインをクリックして、構成ページを開きます。
ページの右上隅にある [コンポーネントライブラリ] をクリックして、[コンポーネントライブラリ] パネルを開きます。
[コンポーネントライブラリ] パネルの左側のナビゲーションウィンドウで、[入力] を選択します。右側の入力コンポーネントリストで [KAFKA] コンポーネントを見つけて、キャンバスにドラッグします。
KAFKA 入力コンポーネントカードの
アイコンをクリックして、[KAFKA 入力構成] ダイアログボックスを開きます。[KAFKA 入力構成] ダイアログボックスで、次の表に示すようにパラメーターを構成します。
パラメーター
説明
ステップ名
Kafka 入力コンポーネントの名前。 Dataphin は自動的にステップ名を生成します。ビジネスシナリオに基づいて変更することもできます。名前は次の要件を満たしている必要があります。
漢字、英字、アンダースコア(_)、数字のみを含めることができます。
長さは 64 文字を超えることはできません。
データソース
データソースのドロップダウンリストには、現在の Dataphin インスタンスのすべての Kafka データソースが表示されます。これには、リードスルー権限を持つデータソースと、リードスルー権限を持たないデータソースが含まれます。
アイコンをクリックして、現在のデータソース名をコピーします。リードスルー権限を持たないデータソースの場合、データソースの横にある [リクエスト] をクリックして、リードスルー権限をリクエストできます。 詳細については、「データソース権限のリクエスト」をご参照ください。
Kafka データソースがない場合は、[データソースの作成] をクリックして作成します。 詳細については、「Kafka データソースの作成」をご参照ください。
トピック
Kafka トピック。ドロップダウンリストをクリックして、データを読み取る Kafka トピック名を選択します。
キータイプ
Kafka キーのタイプ。Kafka Consumer を初期化するときの key.deserializer 構成を決定します。有効な値:[BYTEARRAY]、[DOUBLE]、[FLOAT]、[INTEGER]、[LONG]、[SHORT]、[STRING]、[KAFKA AVRO](データソースに schema.registry が構成されている場合に利用可能)。
値のタイプ
Kafka 値のタイプ。Kafka Consumer を初期化するときの value.deserializer 構成を決定します。有効な値:[BYTEARRAY]、[DOUBLE]、[FLOAT]、[INTEGER]、[LONG]、[SHORT]、[STRING]、[KAFKA AVRO](データソースに schema.registry が構成されている場合に利用可能)。
コンシューマーグループ ID
Kafka Consumer を初期化するときの group.id 構成。
Data Integration のデータ同期ノードが正しいオフセットからデータを使用する場合は、このパラメーターをデータ同期ノードに固有の値に設定する必要があります。 このパラメーターを指定しない場合、同期が実行されるたびに
datax_で始まるランダムな文字列が group.id として自動的に生成されます。開始時刻
データの読み取り開始時刻。特定の時刻を指定するには、yyyyMMddHHmmss 形式の時間文字列のみがサポートされます。これは、時間範囲の左側の境界です。このパラメーターは、スケジューリングパラメーターと一緒に使用する必要があります。たとえば、スケジューリングパラメーターが
beginDateTime=${20220101000000}として構成されている場合、[開始時刻] は ${beginDateTime} として構成する必要があります。終了時刻
データの読み取り終了時刻。特定の時刻を指定するには、yyyyMMddHHmmss 形式の時間文字列のみがサポートされます。これは、時間範囲の右側の境界です。このパラメーターは、スケジューリングパラメーターと一緒に使用する必要があります。たとえば、スケジューリングパラメーターが
endDateTime=${20220101000000}として構成されている場合、[終了時刻] は ${endDateTime} として構成する必要があります。[同期の終了戦略]
同期の終了戦略を選択します。次の 2 つの戦略を使用できます。
[1 分間新しいデータが読み取られない場合]:コンシューマーが Kafka から 1 分間データを取得しない場合(通常は、トピック内のすべてのデータが読み取られたため、またはネットワークまたは Kafka クラスタの可用性の問題が原因である可能性があります)、タスクはすぐに停止します。 それ以外の場合は、データが再び読み取られるまで再試行を続けます。
[指定された終了オフセットに達した場合]:データ統合タスクによって読み取られた Kafka レコードのビジネスタイムまたはオフセットが上記の終了オフセット構成を満たしている場合、タスクは終了します。 それ以外の場合は、Kafka レコードの読み取りを無期限に再試行し続けます。
詳細設定
詳細設定を使用して、オフセットリセット戦略、単一読み取りサイズ、単一読み取り時間、および読み取りタイムアウトを構成できます。トピックにスキーマレジストリが構成されている場合は、詳細設定で keySchema パラメーターと valueSchema パラメーターを構成する必要があります。このパラメーターはデフォルトでは空です。サンプル形式は次のとおりです。
{ "namespace": "example.avro", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type": ["string", "null"]} ] }出力フィールド
デフォルトでは、6 つのフィールドが表示されます:[__key__]、[__value__]、[__partition__]、[__headers__]、[__offset__]、[__timestamp__]。出力フィールドを手動で追加できます。
[一括追加] をクリックして、JSON 形式または TEXT 形式を使用して一括で構成します。
JSON 形式の例
[ { "index": 0, // インデックス "name": "__key__", // 名前 "type": "STRING" // タイプ }, { "index": 1, "name": "__value__", "type": "STRING" }, { "index": 2, "name": "__partition__", "type": "INTEGER" }, { "index": 3, "name": "__headers__", "type": "STRING" }, { "index": 4, "name": "__offset__", "type": "LONG" }, { "index": 5, "name": "__timestamp__", "type": "LONG" } ]説明indexは指定されたオブジェクトの列番号を示し、nameはインポート後のフィールド名を示し、typeはインポート後のフィールドタイプを示します。 たとえば、"index":3,"name":"user_id","type":"String"は、ファイルから4 番目の列をインポートすることを意味し、フィールド名はuser_id、フィールドタイプはStringです。TEXT 形式の例
0,__key__,STRING 1,__value__,STRING 2,__partition__,INTEGER 3,__headers__,STRING 4,__offset__,LONG 5,__timestamp__,LONG説明行区切り文字は、各フィールドの情報を区切るために使用されます。デフォルトは改行(\n)です。改行(\n)、セミコロン(;)、ピリオド(.)がサポートされています。
列区切り文字は、フィールド名とフィールドタイプを区切るために使用されます。デフォルトはカンマ(,)です。
[新しい出力フィールド] をクリックし、プロンプトに従って [ソースインデックス]、[列] を入力し、[タイプ] を選択します。
ソーステーブルフィールドは、上記の 6 つの文字列以外の文字列として構成することもできます。この場合、Kafka レコードは JSON 文字列として解析され、ソーステーブルフィールドに構成された文字列は、対応するコンテンツをフィールド値として読み取り、対応するターゲットテーブルフィールドに書き込むための JSON パスとして使用されます。例:
{ "data": { "name": "bob", "age": 35 } }が Kafka レコードの値である場合、ソーステーブルフィールドが data.name として構成されていると、「bob」がこのフィールドの値として読み取られ、対応するターゲットテーブルに書き込まれます。追加できるフィールドタイプは、Java タイプと datax マッピングタイプです。追加されたフィールドに対して次の操作を実行することもできます。
列の [アクション]
アイコンをクリックして、既存のフィールドを編集します。列の [アクション]
アイコンをクリックして、既存のフィールドを削除します。
[OK] をクリックして、Kafka 入力コンポーネントの構成を完了します。