Kafka Writer を構成するには、データを書き込むテーブルを選択し、フィールドマッピングを構成するだけです。
前提条件
リーダーノードまたは変換ノードが構成されています。 詳細については、「リアルタイム同期をサポートするデータソースの種類」をご参照ください。
背景情報
Kafka に書き込むデータの重複除去はサポートされていません。 同期ノードのオフセットをリセットした場合、またはフェールオーバー後に同期ノードが再起動された場合、重複データが Kafka に書き込まれる可能性があります。
手順
DataWorks コンソール にログインします。 上部のナビゲーションバーで、目的のリージョンを選択します。 左側のナビゲーションウィンドウで、 を選択します。 表示されるページで、ドロップダウンリストから目的のワークスペースを選択し、[データ開発に移動] をクリックします。
マウスを
アイコンの上に移動し、 をクリックします。または、ビジネスフローを展開し、ターゲットビジネスフローを右クリックして、 を選択します。
[ノードの作成] ダイアログボックスで、[同期方法] パラメーターを [エンドツーエンド ETL] に設定し、[名前] パラメーターと [パス] パラメーターを構成します。
[確認] をクリックします。
リアルタイム同期ノードの構成タブで、[kafka] を [出力] セクションから右側のキャンバスにドラッグし、Kafka ノードを構成済みのリーダーノードまたは変換ノードに接続します。
[kafka] ノードをクリックします。 表示されるパネルで、パラメーターを構成します。

パラメーター
説明
データソース
DataWorks に追加した Kafka データソースの名前。 Kafka データソースのみを選択できます。 データソースがない場合は、右側の [新しいデータソース] をクリックして、[管理センター] の [データソース] ページに移動し、Kafka データソースを追加します。 詳細については、「Kafka データソースを追加する」をご参照ください。
Topic
データを書き込む Kafka トピックの名前。 Kafka は、トピックと呼ばれるカテゴリでメッセージのフィードを保持します。
Kafka クラスタに公開される各メッセージには、トピックが割り当てられます。 各トピックには、メッセージのグループが含まれています。
説明各データ同期ノードの Kafka Writer は、1 つのトピックにのみデータを書き込むことができます。
キー列
各行の値が宛先 Kafka トピックのキーとして使用されるソース列の名前。 複数の列を選択した場合、各行の列値はカンマ (,) を使用してキーとして連結されます。 列を選択しない場合、空の文字列が宛先 Kafka トピックのキーとして使用されます。
値列
各行の値が宛先 Kafka トピックの値として連結されるソース列の名前。 列を選択しない場合、各行のすべてのソース列の値が宛先 Kafka トピックの値として連結されます。 ソース列の値を連結するために使用される方法は、指定する書き込みモードによって異なります。 詳細については、「Kafka Writer」で提供されているパラメーターの説明をご参照ください。
キーの型
Kafka トピックのキーのデータ型。 このパラメーターの値によって、Kafka プロデューサー を初期化するために使用される key.serializer の設定が決まります。 有効な値: STRING、BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG、SHORT。
値の型
Kafka トピックの値のデータ型。 このパラメーターの値によって、Kafka プロデューサー を初期化するために使用される value.serializer の設定が決まります。 有効な値: STRING、BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG、SHORT。
一度に書き込まれるデータ量
一度に書き込むバイト数。 このパラメーターは 16000 より大きい値に設定することをお勧めします。
書き込みモード
書き込みモード。 このパラメーターを使用して、Kafka Writer がソースの値列の値を連結する形式を指定できます。 有効な値: text および json。
このパラメーターを text に設定すると、Kafka Writer は指定された区切り文字を使用して列の値を連結します。
このパラメーターを json に設定すると、Kafka Writer は列の値を JSON 文字列として連結します。
たとえば、リーダーから値列として 3 つの列 col1、col2、col3 が取得され、特定の行の列の値が a、b、c であるとします。 書き込みモードパラメーターが text に設定され、列区切り文字パラメーターが
#に設定されている場合、宛先 Kafka トピックに格納される値はa#b#cです。 書き込みモードパラメーターが json に設定されている場合、宛先 Kafka トピックに格納される値は文字列{"col1":"a","col2":"b","col3":"c"}です。列区切り文字
text に設定されている場合、リーダーから取得した列値を連結するために使用されるデリミタです。「書き込みモード」パラメータが text に設定されている場合、リーダーから取得した列値を連結するために使用されるデリミタ。各行の列の値は、宛先 Kafka トピックの値として連結されます。1 つ以上の文字を列区切り文字として指定できます。Unicode 文字は
\u0001の形式で指定できます。\tや\nなどのエスケープ文字がサポートされています。デフォルト値:\t。構成パラメーター
Kafka コンシューマー を作成するときに構成できる拡張パラメーター。 たとえば、bootstrap.servers、acks、linger.ms パラメーターを構成できます。 KafkaConfig のパラメーターを構成して、Kafka コンシューマー のデータ読み取り動作を制御できます。 Kafka にデータを同期するリアルタイム同期ノードの場合、Kafka コンシューマー の acks パラメーターのデフォルト値は all です。 パフォーマンスに対する要件が高い場合は、acks パラメーターに別の値を指定できます。 acks パラメーターの有効な値:
0: Kafka コンシューマーは、データが宛先に書き込まれたかどうかを確認しません。
1: Kafka コンシューマーは、データがプライマリレプリカに書き込まれた場合、書き込み操作が成功したことを確認します。
all: Kafka コンシューマーは、データがすべてのレプリカに書き込まれた場合、書き込み操作が成功したことを確認します。
リアルタイム同期ノードの構成タブの上部ツールバーにある
アイコンをクリックして、ノードを保存します。