DataHub は Kafka プロトコルと完全に互換性があります。ネイティブ Kafka クライアントを使用して、DataHub からデータを読み書きできます。
背景情報
Kafka から DataHub へのマッピング
トピックタイプ
Kafka のトピック拡張モードは、DataHub のトピック拡張モードとは異なります。Kafka のトピック拡張モードに適応するには、DataHub でトピックを作成するときに ExpandMode パラメーターを ONLY_EXTEND に設定する必要があります。 ExpandMode パラメーターが ONLY_EXTEND に設定されているトピックは、分割操作またはマージ操作をサポートしていません。シャードを追加することはできますが、削除することはできません。
トピックの命名
Kafka のトピック名は、DataHub のプロジェクト名とトピック名にマッピングされ、ピリオド(.)で区切られます。たとえば、Kafka の test_project.test_topic という名前のトピックは、DataHub の test_project という名前のプロジェクト内の test_topic という名前のトピックにマッピングされます。Kafka のトピック名に複数のピリオド(.)が含まれている場合、最初のピリオド(.)より前の部分は DataHub のプロジェクト名であり、残りの部分はトピック名です。その他のピリオド(.)とハイフン(-)はアンダースコア(_)に置き換えられます。
パーティション
DataHub の各アクティブシャードは、Kafka のパーティションに対応しています。DataHub のアクティブシャードの数が 5 つの場合、Kafka に 5 つのパーティションが含まれていると見なすことができます。データを書き込むときは、パーティション ID [0,4] に基づいてパーティションを指定できます。パーティションを指定しない場合、Kafka クライアントはデータを書き込むパーティションを決定します。
TUPLE トピック
Kafka から DataHub の TUPLE トピックにキーと値のペアを書き込む場合、TUPLE トピックのスキーマには STRING タイプのフィールドが 1 つまたは 2 つ含まれている必要があります。そうでない場合、データの書き込みは失敗します。スキーマにフィールドが 1 つしか含まれていない場合、キーと値のペアの値のみが書き込まれ、キーは破棄されます。スキーマに 2 つのフィールドが含まれている場合、値は一方のフィールドに書き込まれ、キーはもう一方のフィールドに書き込まれます。バイナリデータを TUPLE トピックに書き込むと、データはトピックに文字化けして表示されます。バイナリデータは BLOB トピックに書き込むことをお勧めします。
BLOB トピック
Kafka から DataHub の BLOB トピックにキーと値のペアを書き込むと、値は BLOB トピックに書き込まれます。キーが NULL でない場合、キーは属性として DataHub に書き込まれます。属性のキーは __kafka_key__ で、値は Kafka データのキーです。
ヘッダー
Kafka のヘッダーは、DataHub の属性に対応しています。ただし、値が NULL のヘッダーは Kafka では無視されます。ヘッダーのキーとして __kafka_key__ を使用しないことをお勧めします。
コンシューマーグループ
DataHub では、コンシューマーグループはサブスクリプション ID にマッピングされ、一度に 1 つのトピックのみをサブスクライブできます。ただし、Kafka グループは一度に複数のトピックをサブスクライブできます。Kafka のサブスクリプション方法との互換性を高めるために、DataHub ではプロジェクトにグループを作成し、サブスクライブするトピックにグループをバインドしてから、グループを使用してプロジェクト内の複数のトピックをサブスクライブできます。DataHub では、グループはサーバーによって管理されるサブスクリプションのセットです。グループがトピックにバインドされている場合、トピックの詳細ページの [サブスクリプションリスト] タブにグループによって自動的に作成されたサブスクリプションを表示できます。サブスクリプションを削除すると、グループはトピックをサブスクライブできなくなり、既存の消費オフセットはなくなります。
グループは最大 50 個のトピックをサブスクライブできます。グループのトピックをさらにサブスクライブする必要がある場合は、チケットを送信してください。
Kafka 構成パラメーター
C = コンシューマー、P = プロデューサー、S = ストリーム
パラメーター | C/P/S | 有効な値 | 必須 | 説明 |
bootstrap.servers | * | 詳細については、このトピックの「Kafka エンドポイント」セクションをご参照ください。 | はい | |
security.protocol | * | SASL_SSL | はい | 安全なデータ転送を確保するために、Kafka クライアントから DataHub にデータが書き込まれるときに、暗号化に Secure Sockets Layer(SSL)が使用されます。 |
sasl.mechanism | * | PLAIN | はい | AccessKey 認証モード。このパラメーターを PLAIN に設定します。 |
compression.type | P | LZ4 | いいえ | 圧縮転送を有効にするかどうかを指定します。LZ4 圧縮アルゴリズムのみがサポートされています。 |
group.id | C | project.topic:subId または project.group | はい | コンシューマーグループの ID。 project.topic:subId を使用する場合は、サブスクライブされたトピックに基づいてこのパラメーターを設定します。そうでない場合、データを読み取ることができません。 project.group を使用することをお勧めします。 |
partition.assignment.strategy | C | org.apache.kafka.clients.consumer.RangeAssignor | いいえ | パーティション割り当てのポリシー。Kafka のパーティション割り当てのデフォルトポリシーは RangeAssignor であり、これは DataHub でサポートされている唯一のポリシーでもあります。このパラメーターは変更しないでください。 |
session.timeout.ms | C/S | [60000, 180000] | いいえ | セッションのタイムアウト期間。Kafka のセッションのデフォルトのタイムアウト期間は 10,000 ミリ秒です。ただし、DataHub のセッションの最小タイムアウト期間は 60,000 ミリ秒です。したがって、このパラメーターのデフォルト値は 60000 です。 |
heartbeat.interval.ms | C/S | このパラメーターを指定されたセッションのタイムアウト期間の 3 分の 2 に設定することをお勧めします。 | いいえ | ハートビート間隔。Kafka のデフォルトのハートビート間隔は 3,000 ミリ秒です。 |
application.id | S | project.topic:subId または project.group | はい | アプリケーション ID。 project.topic:subId を使用する場合は、サブスクライブされたトピックに基づいてこのパラメーターを設定します。そうでない場合、データの読み取りは失敗します。 project.group を使用することをお勧めします。 |
上記の表は、Kafka クライアントから DataHub にデータを書き込むときに特に注意が必要なパラメーターについて説明しています。 retries,batch.size などのクライアント関連のパラメーターは影響を受けません。サーバー関連のパラメーターは、サーバーの動作に影響を与えません。たとえば、acks パラメーターの値に関係なく、DataHub はデータが書き込まれた後に値を返します。
Kafka エンドポイント
リージョン | リージョン ID | パブリックエンドポイント | クラシックネットワーク上の ECS エンドポイント | VPC 内の ECS エンドポイント |
中国 (杭州) | cn-hangzhou | dh-cn-hangzhou.aliyuncs.com:9092 | dh-cn-hangzhou.aliyun-inc.com:9093 | dh-cn-hangzhou-int-vpc.aliyuncs.com:9094 |
中国 (上海) | cn-shanghai | dh-cn-shanghai.aliyuncs.com:9092 | dh-cn-shanghai.aliyun-inc.com:9093 | dh-cn-shanghai-int-vpc.aliyuncs.com:9094 |
中国 (北京) | cn-beijing | dh-cn-beijing.aliyuncs.com:9092 | dh-cn-beijing.aliyun-inc.com:9093 | dh-cn-beijing-int-vpc.aliyuncs.com:9094 |
中国 (深圳) | cn-shenzhen | dh-cn-shenzhen.aliyuncs.com:9092 | dh-cn-shenzhen.aliyun-inc.com:9093 | dh-cn-shenzhen-int-vpc.aliyuncs.com:9094 |
中国 (張家口) | cn-zhangjiakou | dh-cn-zhangjiakou.aliyuncs.com:9092 | dh-cn-zhangjiakou.aliyun-inc.com:9093 | dh-cn-zhangjiakou-int-vpc.aliyuncs.com:9094 |
シンガポール | ap-southeast-1 | dh-ap-southeast-1.aliyuncs.com:9092 | dh-ap-southeast-1.aliyun-inc.com:9093 | dh-ap-southeast-1-int-vpc.aliyuncs.com:9094 |
マレーシア (クアラルンプール) | ap-southeast-3 | dh-ap-southeast-3.aliyuncs.com:9092 | dh-ap-southeast-3.aliyun-inc.com:9093 | dh-ap-southeast-3-int-vpc.aliyuncs.com:9094 |
インド (ムンバイ) (閉鎖中) | ap-south-1 | dh-ap-south-1.aliyuncs.com:9092 | dh-ap-south-1.aliyun-inc.com:9093 | dh-ap-south-1-int-vpc.aliyuncs.com:9094 |
ドイツ (フランクフルト) | eu-central-1 | dh-eu-central-1.aliyuncs.com:9092 | dh-eu-central-1.aliyun-inc.com:9093 | dh-eu-central-1-int-vpc.aliyuncs.com:9094 |
中国東部 2 金融 | cn-shanghai-finance-1 | dh-cn-shanghai-finance-1.aliyuncs.com:9092 | dh-cn-shanghai-finance-1.aliyun-inc.com:9093 | dh-cn-shanghai-finance-1-int-vpc.aliyuncs.com:9094 |
中国 (香港) | cn-hongkong | dh-cn-hongkong.aliyuncs.com:9092 | dh-cn-hongkong.aliyun-inc.com:9093 | dh-cn-hongkong-int-vpc.aliyuncs.com:9094 |
例
コードを使用してトピックを作成する
注: Kafka の API 操作を呼び出してトピックを作成することはできません。トピックを作成するには、DataHub SDK を呼び出す必要があります。トピックを作成するときは、ExpandMode パラメーターを ONLY_EXTEND に設定する必要があります。 Maven 依存関係のバージョンは V2.19.0 以降である必要があります。
プロジェクトで AccessKey ID と AccessKey Secret を構成する必要があります。構成ファイルで AccessKey ID と AccessKey Secret を構成するには、次の環境変数を設定することをお勧めします。
datahub.endpoint=<yourEndpoint>
datahub.accessId=<yourAccessKeyId>
// AccessKey ID と AccessKey Secret をプロジェクトコードにハードコードしないことをお勧めします。そうしないと、AccessKey ペアがリークし、アカウント内のすべてのリソースのセキュリティが危険にさらされる可能性があります。
datahub.accessKey=<yourAccessKeySecret>Alibaba Cloud アカウントの AccessKey ペアには、すべての API 操作に対する権限があります。AccessKey ペアを使用して操作を実行することは、リスクの高い操作です。RAM ユーザーを使用して API 操作を呼び出すか、日常的な O&M を実行することをお勧めします。
AccessKey ID と AccessKey Secret をプロジェクトコードにハードコードしないことをお勧めします。そうしないと、AccessKey ペアがリークし、アカウント内のすべてのリソースのセキュリティが危険にさらされる可能性があります。
<dependency>
<groupId>com.aliyun.datahub</groupId>
<artifactId>aliyun-sdk-datahub</artifactId>
<version>2.19.0-public</version>
</dependency>@Value("${datahub.endpoint}")
String endpoint ;
@Value("${datahub.accessId}")
String accessId;
@Value("${datahub.accessKey}")
String accessKey;
public class CreateTopic {
public static void main(String[] args) {
DatahubClient datahubClient = DatahubClientBuilder.newBuilder()
.setDatahubConfig(
new DatahubConfig(endpoint,
new AliyunAccount(accessId, accessKey)))
.build();
int shardCount = 1;
int lifeCycle = 7;
try {
// ExpandMode パラメーターを ONLY_EXTEND に設定します。
datahubClient.createTopic("test_project", "test_topic", shardCount, lifeCycle, RecordType.BLOB, "comment", ExpandMode.ONLY_EXTEND);
} catch (DatahubClientException e) {
e.printStackTrace();
}
}
}コードを使用してグループを作成する
Maven 依存関係のバージョンは V2.21.6 以降である必要があります。
<dependency>
<groupId>com.aliyun.datahub</groupId>
<artifactId>aliyun-sdk-datahub</artifactId>
<version>2.21.6-public</version>
</dependency>@Value("${datahub.endpoint}")
String endpoint ;
@Value("${datahub.accessId}")
String accessId;
@Value("${datahub.accessKey}")
String accessKey;
public class CreateGroup {
public static void main(String[] args) {
DatahubClient datahubClient = DatahubClientBuilder.newBuilder()
.setDatahubConfig(
new DatahubConfig(endpoint,
new AliyunAccount(accessId, accessKey)))
.build();
List<String> topicList = new ArrayList<>();
topicList.add("test_project.topic1");
topicList.add("test_project.topic2");
topicList.add("test_project.topic3");
try {
// Kafka グループを作成します。
datahubClient.createKafkaGroup("test_project", "test_topic", "test comment");
// サブスクリプションに必要なトピックをグループにバインドします。
datahubClient.updateTopicsForKafkaGroup("test_project", "test_topic", topicList, UpdateKafkaGroupMode.ADD);
} catch (DatahubClientException e) {
e.printStackTrace();
}
}
}サンプルプロデューサー:
kafka_client_producer_jaas.conf ファイルを生成する
kafka_client_producer_jaas.conf ファイルを作成し、ディレクトリに保存します。ファイルには次の内容が含まれています。
KafkaClient {
// AccessKey ID と AccessKey Secret をプロジェクトコードにハードコードしないことをお勧めします。そうしないと、AccessKey ペアがリークし、アカウント内のすべてのリソースのセキュリティが危険にさらされる可能性があります。
org.apache.kafka.common.security.plain.PlainLoginModule required
username="accessId"
password="accessKey";
};Maven 依存関係
Kafka クライアントのバージョンは V0.10.0.0 以降である必要があります。推奨バージョンは V2.4.0 です。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.0</version>
</dependency>サンプルコード
public class ProducerExample {
static {
System.setProperty("java.security.auth.login.config", "src/main/resources/kafka_client_producer_jaas.conf");
}
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "PLAIN");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 圧縮転送を有効にするには、このパラメーターを設定します。
properties.put("compression.type", "lz4");
String KafkaTopicName = "test_project.test_topic";
Producer<String, String> producer = new KafkaProducer<String, String>(properties);
try {
List<Header> headers = new ArrayList<>();
RecordHeader header1 = new RecordHeader("key1", "value1".getBytes());
RecordHeader header2 = new RecordHeader("key2", "value2".getBytes());
// ヘッダーを追加します。
headers.add(header1);
headers.add(header2);
ProducerRecord<String, String> record = new ProducerRecord<>(KafkaTopicName, 0, "key", "Hello DataHub!", headers);
// 同期送信
producer.send(record).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} finally {
producer.close();
}
}
}実行結果
実行が完了したら、サンプルデータを確認して、DataHub が正しく実行されているかどうかを確認します。
サンプルコンシューマー
kafka_client_producer_jaas.conf ファイルと maven 依存関係を生成する方法については、このトピックの「サンプルプロデューサー」セクションの関連情報をご参照ください。
コンシューマーを追加したら、シャードの割り当てが完了するまで約 10 秒待ちます。その後、コンシューマーはデータを使用できます。
サンプルコード
(推奨) Kafka グループの使用例
package com.aliyun.datahub.kafka.demo;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class ConsumerExample2 {
static {
System.setProperty("java.security.auth.login.config", "src/main/resources/kafka_client_producer_jaas.conf");
}
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "PLAIN");
// group.id パラメーターを project.group に設定します。
properties.put("group.id", "test_project.test_kafka_group");
properties.put("auto.offset.reset", "earliest");
properties.put("session.timeout.ms", "60000");
properties.put("heartbeat.interval.ms", "40000");
properties.put("ssl.endpoint.identification.algorithm", "");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
List<String> topicList = new ArrayList<>();
topicList.add("test_project.test_topic1");
topicList.add("test_project.test_topic2");
topicList.add("test_project.test_topic3");
// Kafka グループを使用すると、一度に複数のトピックをサブスクライブできます。
kafkaConsumer.subscribe(topicList);
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(5));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.toString());
}
}
}
}project.topic.subid の使用例
package com.aliyun.datahub.kafka.demo;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class ConsumerExample {
static {
System.setProperty("java.security.auth.login.config", "src/main/resources/kafka_client_producer_jaas.conf");
}
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "PLAIN");
// group.id パラメーターを project.topic.subId に設定します。
properties.put("group.id", "test_project.test_topic:1611039998153N71KM");
properties.put("auto.offset.reset", "earliest");
properties.put("session.timeout.ms", "60000");
properties.put("heartbeat.interval.ms", "40000");
properties.put("ssl.endpoint.identification.algorithm", "");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
// project.topic.subId を使用すると、1 つのトピックのみをサブスクライブできます。
kafkaConsumer.subscribe(Collections.singletonList("test_project.test_topic"));
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(5));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.toString());
}
}
}
}実行結果
実行が完了したら、コンシューマークライアントで読み取られたデータを表示できます。
ConsumerRecord(topic = test_project.test_topic, partition = 0, leaderEpoch = 0, offset = 0, LogAppendTime = 1611040892661, serialized key size = 3, serialized value size = 14, headers = RecordHeaders(headers = [RecordHeader(key = key1, value = [118, 97, 108, 117, 101, 49]), RecordHeader(key = key2, value = [118, 97, 108, 117, 101, 50])], isReadOnly = false), key = key, value = Hello DataHub!)注: データ読み取りリクエストに対して返されるすべてのデータは、LogAppendTime パラメーターの同じ値を共有します。これは、データのタイムスタンプの最大値です。
サンプルストリームタスク
Maven 依存関係
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.4.0</version>
</dependency>サンプルコード
次のサンプルコードは、test_project の入力データを読み取り、キーと値を小文字に変換してから、出力データに書き込みます。
public class StreamExample {
static {
System.setProperty("java.security.auth.login.config", "src/main/resources/kafka_client_producer_jaas.conf");
}
public static void main(final String[] args) {
final String input = "test_project.input";
final String output = "test_project.output";
final Properties properties = new Properties();
properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092");
// application.id パラメーターを project.topic:subId に設定します。
properties.put("application.id", "test_project.input:1611293595417QH0WL");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "PLAIN");
properties.put("session.timeout.ms", "60000");
properties.put("heartbeat.interval.ms", "40000");
properties.put("auto.offset.reset", "earliest");
final StreamsBuilder builder = new StreamsBuilder();
TestMapper testMapper = new TestMapper();
builder.stream(input, Consumed.with(Serdes.String(), Serdes.String()))
.map(testMapper)
.to(output, Produced.with(Serdes.String(), Serdes.String()));
final KafkaStreams streams = new KafkaStreams(builder.build(), properties);
final CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (final Throwable e) {
System.exit(1);
}
System.exit(00);
}
static class TestMapper implements KeyValueMapper<String, String, KeyValue<String, String>> {
@Override
public KeyValue<String, String> apply(String s, String s2) {
return new KeyValue<>(StringUtils.lowerCase(s), StringUtils.lowerCase(s2));
}
}
}実行結果
ストリームタスクを開始したら、シャードの割り当てが完了するまで約 1 分待ちます。その後、DataHub コンソールでタスクの数を確認できます。タスクの数は、入力トピックのシャードの数と一致しています。この例では、入力トピックには 3 つのシャードが含まれています。
currently assigned active tasks: [0_0, 0_1, 0_2]
currently assigned standby tasks: []
revoked active tasks: []
revoked standby tasks: []シャードの割り当てが完了したら、(AAAA,BBBB)、(CCCC,DDDD)、(EEEE,FFFF) などのテストデータを 入力トピックに書き込むことができます。次に、出力データをサンプリングして、データ書き込みが有効かどうかを確認します。
注意事項
トランザクションと冪等性はサポートされていません。
Kafka クライアントは DataHub トピックを自動的に作成できません。Kafka クライアントから DataHub にデータを書き込む前に、DataHub トピックが作成されていることを確認してください。
各コンシューマーは 1 つのトピックのみをサブスクライブできます。
コンシューマーによって読み取られるデータのタイムスタンプは、LogAppendTime パラメーターの値です。これは、データが DataHub に書き込まれた時刻を示します。データ読み取りリクエストに対して返されるすべてのデータは、同じタイムスタンプを共有します。これは、データのタイムスタンプの最大値です。したがって、データを読み取るときに取得されるタイムスタンプは、データが DataHub に書き込まれた実際の時刻よりも大きくなる場合があります。
各ストリームタスクは、1 つの入力トピックと複数の出力トピックのみをサポートします。
ストリームタスクはステートレスです。
サポートされている Kafka のバージョンは V0.10.0 から V2.4.0 です。
FAQ
1. データ書き込み中に接続が切断されるのはなぜですか?
Selector - [Producer clientId=producer-1] Connection with dh-cn-shenzhen.aliyuncs.com disconnected
java.io.EOFException
at org.apache.kafka.common.network.SslTransportLayer.read(SslTransportLayer.java:573)
...Kafka では、メタリクエストとデータ書き込みリクエストは同じ接続を使用しません。メタリクエストが初めて送信されると、接続が確立されます。データ書き込みリクエストが送信されると、メタリクエストに対して返されたブローカーへの接続が確立されます。その後、後続のすべてのリクエストは 2 番目の接続を介して送信され、最初の接続はアイドル状態になります。接続が特定の時間制限を超えてアイドル状態のままの場合、サーバーは接続を自動的に閉じます。したがって、このエラーがデータ書き込みに影響を与えない場合は、無視できます。
Kafka クライアントを起動できない場合はどうすればよいですか?
Caused by: org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
Caused by: javax.net.ssl.SSLHandshakeException: No subject alternative names matching IP address 100.67.134.161 foundKafka クライアントを起動できない場合は、次のコードを追加します: properties.put("ssl.endpoint.identification.algorithm", "");。
コンシューマークライアントでデータ消費中に DisconnectException エラーが発生するのはなぜですか?
[INFO][Consumer clientId=client-id, groupId=consumer-project.topic:subid] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 1: {}.
org.apache.kafka.common.errors.DisconnectExceptionKafka クライアントは、サーバーへの TCP ベースの永続接続を維持する必要があります。ほとんどの場合、DisconnectException エラーはネットワークのジッターが原因で発生します。クライアントに再試行ロジックが構成されているため、このエラーはクライアントでのデータ消費に影響を与えません。